diff options
| -rw-r--r-- | bgpd/bgp_route.c | 28 | ||||
| -rw-r--r-- | tests/topotests/bgp-route-map/bgp_route_map_topo1.json | 187 | ||||
| -rwxr-xr-x | tests/topotests/bgp-route-map/bgp_route_map_topo2.json | 316 | ||||
| -rwxr-xr-x | tests/topotests/bgp-route-map/test_route_map_topo1.py | 1361 | ||||
| -rwxr-xr-x | tests/topotests/bgp-route-map/test_route_map_topo2.py | 3916 | ||||
| -rw-r--r-- | tests/topotests/lib/bgp.py | 144 | ||||
| -rw-r--r-- | tests/topotests/lib/common_config.py | 458 | ||||
| -rw-r--r-- | tests/topotests/lib/topojson.py | 2 |
8 files changed, 6367 insertions, 45 deletions
diff --git a/bgpd/bgp_route.c b/bgpd/bgp_route.c index 37360a559a..d083586c93 100644 --- a/bgpd/bgp_route.c +++ b/bgpd/bgp_route.c @@ -6562,6 +6562,7 @@ DEFUN (aggregate_address_mask, argv_find(argv, argc, "A.B.C.D", &idx); char *prefix = argv[idx]->arg; char *mask = argv[idx + 1]->arg; + bool rmap_found; char *rmap = NULL; int as_set = argv_find(argv, argc, "as-set", &idx) ? AGGREGATE_AS_SET : 0; @@ -6570,8 +6571,8 @@ DEFUN (aggregate_address_mask, ? AGGREGATE_SUMMARY_ONLY : 0; - argv_find(argv, argc, "WORD", &idx); - if (idx) + rmap_found = argv_find(argv, argc, "WORD", &idx); + if (rmap_found) rmap = argv[idx]->arg; char prefix_str[BUFSIZ]; @@ -6588,14 +6589,16 @@ DEFUN (aggregate_address_mask, DEFUN (no_aggregate_address, no_aggregate_address_cmd, - "no aggregate-address A.B.C.D/M [<as-set [summary-only]|summary-only [as-set]>]", + "no aggregate-address A.B.C.D/M [<as-set [summary-only]|summary-only [as-set]>] [route-map WORD]", NO_STR "Configure BGP aggregate entries\n" "Aggregate prefix\n" "Generate AS set path information\n" "Filter more specific routes from updates\n" "Filter more specific routes from updates\n" - "Generate AS set path information\n") + "Generate AS set path information\n" + "Apply route map to aggregate network\n" + "Name of route map\n") { int idx = 0; argv_find(argv, argc, "A.B.C.D/M", &idx); @@ -6605,7 +6608,7 @@ DEFUN (no_aggregate_address, DEFUN (no_aggregate_address_mask, no_aggregate_address_mask_cmd, - "no aggregate-address A.B.C.D A.B.C.D [<as-set [summary-only]|summary-only [as-set]>]", + "no aggregate-address A.B.C.D A.B.C.D [<as-set [summary-only]|summary-only [as-set]>] [route-map WORD]", NO_STR "Configure BGP aggregate entries\n" "Aggregate address\n" @@ -6613,7 +6616,9 @@ DEFUN (no_aggregate_address_mask, "Generate AS set path information\n" "Filter more specific routes from updates\n" "Filter more specific routes from updates\n" - "Generate AS set path information\n") + "Generate AS set path information\n" + "Apply route map to aggregate network\n" + "Name of route map\n") { int idx = 0; argv_find(argv, argc, "A.B.C.D", &idx); @@ -6647,6 +6652,7 @@ DEFUN (ipv6_aggregate_address, argv_find(argv, argc, "X:X::X:X/M", &idx); char *prefix = argv[idx]->arg; char *rmap = NULL; + bool rmap_found; int as_set = argv_find(argv, argc, "as-set", &idx) ? AGGREGATE_AS_SET : 0; @@ -6655,8 +6661,8 @@ DEFUN (ipv6_aggregate_address, ? AGGREGATE_SUMMARY_ONLY : 0; - argv_find(argv, argc, "WORD", &idx); - if (idx) + rmap_found = argv_find(argv, argc, "WORD", &idx); + if (rmap_found) rmap = argv[idx]->arg; return bgp_aggregate_set(vty, prefix, AFI_IP6, SAFI_UNICAST, rmap, @@ -6665,14 +6671,16 @@ DEFUN (ipv6_aggregate_address, DEFUN (no_ipv6_aggregate_address, no_ipv6_aggregate_address_cmd, - "no aggregate-address X:X::X:X/M [<as-set [summary-only]|summary-only [as-set]>]", + "no aggregate-address X:X::X:X/M [<as-set [summary-only]|summary-only [as-set]>] [route-map WORD]", NO_STR "Configure BGP aggregate entries\n" "Aggregate prefix\n" "Generate AS set path information\n" "Filter more specific routes from updates\n" "Filter more specific routes from updates\n" - "Generate AS set path information\n") + "Generate AS set path information\n" + "Apply route map to aggregate network\n" + "Name of route map\n") { int idx = 0; argv_find(argv, argc, "X:X::X:X/M", &idx); diff --git a/tests/topotests/bgp-route-map/bgp_route_map_topo1.json b/tests/topotests/bgp-route-map/bgp_route_map_topo1.json new file mode 100644 index 0000000000..e89263961d --- /dev/null +++ b/tests/topotests/bgp-route-map/bgp_route_map_topo1.json @@ -0,0 +1,187 @@ +{ + "address_types": ["ipv4","ipv6"], + "ipv4base":"10.0.0.0", + "ipv4mask":30, + "ipv6base":"fd00::", + "ipv6mask":64, + "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64}, + "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128}, + "routers":{ + "r1":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1":{"ipv4":"auto", "ipv6":"auto"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + } + } + } + }, + "r3":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1":{"ipv4":"auto", "ipv6":"auto"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r4":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r4": { + "dest_link": { + "r3": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r4": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r4":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"200", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {} + } + } + } + } + } + } + } + } + } +} diff --git a/tests/topotests/bgp-route-map/bgp_route_map_topo2.json b/tests/topotests/bgp-route-map/bgp_route_map_topo2.json new file mode 100755 index 0000000000..c22a4c3ea7 --- /dev/null +++ b/tests/topotests/bgp-route-map/bgp_route_map_topo2.json @@ -0,0 +1,316 @@ +{ + "address_types": ["ipv4", "ipv6"], + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 30, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:DB8:F::", + "v6mask": 128 + }, + "routers": { + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + }, + "redistribute": [{ + "redist_type": "static" + }, + { + "redist_type": "connected" + } + ] + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + }, + "redistribute": [{ + "redist_type": "static" + }, + { + "redist_type": "connected" + } + ] + } + } + } + }, + + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 2, + "next_hop": "10.0.0.2" + }, + { + "network": "1::1/128", + "no_of_ip": 2, + "next_hop": "fd00::2" + }] + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + } + } + } + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r4": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r5": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r4": { + "dest_link": { + "r3": {} + } + }, + "r5": { + "dest_link": { + "r3": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r4": { + "dest_link": { + "r3": {} + } + }, + "r5": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r4": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "200", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {} + } + } + } + } + } + } + } + }, + "r5": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "300", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r5": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r5": {} + } + } + } + } + } + } + } + } + } +} diff --git a/tests/topotests/bgp-route-map/test_route_map_topo1.py b/tests/topotests/bgp-route-map/test_route_map_topo1.py new file mode 100755 index 0000000000..86ec6c82d2 --- /dev/null +++ b/tests/topotests/bgp-route-map/test_route_map_topo1.py @@ -0,0 +1,1361 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +################################# +# TOPOLOGY +################################# +""" + + +-------+ + +------- | R2 | + | +-------+ + | | + +-------+ | + | R1 | | + +-------+ | + | | + | +-------+ +-------+ + +---------- | R3 |----------| R4 | + +-------+ +-------+ + +""" + +################################# +# TEST SUMMARY +################################# +""" +Following tests are covered to test route-map functionality: +TC_34: + Verify if route-maps is applied in both inbound and + outbound direction to same neighbor/interface. +TC_36: + Test permit/deny statements operation in route-maps with a + permutation and combination of permit/deny in prefix-lists +TC_35: + Test multiple sequence numbers in a single route-map for different + match/set clauses. +TC_37: + Test add/remove route-maps with multiple set + clauses and without any match statement.(Set only) +TC_38: + Test add/remove route-maps with multiple match + clauses and without any set statement.(Match only) +""" + +import sys +import json +import time +import pytest +import inspect +import os +from time import sleep + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, '../')) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib import topotest +from lib.topogen import Topogen, get_topogen +from mininet.topo import Topo + +# Required to instantiate the topology builder class. +from lib.topojson import * +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, verify_bgp_community, + verify_rib, delete_route_maps, create_bgp_community_lists, + interface_status, create_route_maps, create_prefix_lists, + verify_route_maps, check_address_types, + shutdown_bringup_interface, verify_prefix_lists, reset_config_on_routers) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, create_router_bgp, + clear_bgp_and_verify, verify_bgp_attributes) +from lib.topojson import build_topo_from_json, build_config_from_json + + +# Global variables +bgp_convergence = False +BGP_CONVERGENCE = False +ADDR_TYPES = check_address_types() +# Reading the data from JSON File for topology and configuration creation +jsonFile = "{}/bgp_route_map_topo1.json".format(CWD) +try: + with open(jsonFile, 'r') as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +bgp_convergence = False +NETWORK = { + "ipv4": ["11.0.20.1/32", "20.0.20.1/32"], + "ipv6": ["1::1/128", "2::1/128"] +} +MASK = {"ipv4": "32", "ipv6": "128"} +NEXT_HOP = { + "ipv4": "10.0.0.2", + "ipv6": "fd00::2" +} +ADDR_TYPES = check_address_types() + + +class CreateTopo(Topo): + """ + Test topology builder + + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + """Build function""" + tgen = get_topogen(self) + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + global ADDR_TYPES + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(CreateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Checking BGP convergence + global bgp_convergence + + # Don"t run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + bgp_convergence = verify_bgp_convergence(tgen, topo) + assert bgp_convergence is True, ("setup_module :Failed \n Error:" + " {}".format(bgp_convergence)) + + logger.info("Running setup_module() done") + + +def teardown_module(): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info("Testsuite end time: {}". + format(time.asctime(time.localtime(time.time())))) + logger.info("=" * 40) + + +def test_route_map_inbound_outbound_same_neighbor_p0(request): + """ + TC_34: + Verify if route-maps is applied in both inbound and + outbound direction to same neighbor/interface. + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don"t run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for adt in ADDR_TYPES: + + # Create Static routes + input_dict = { + "r1": { + "static_routes": [ + { + "network": NETWORK[adt][0], + "no_of_ip": 9, + "next_hop": NEXT_HOP[adt], + } + ] + } + } + + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "local_as": 100, + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + input_dict_2 = { + "r4": { + "static_routes": [ + { + "network": NETWORK[adt][1], + "no_of_ip": 9, + "next_hop": NEXT_HOP[adt], + } + ] + } + } + + result = create_static_routes(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_5 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_5) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1_ipv4": [{ + "seqid": 10, + "action": "permit", + "network": NETWORK["ipv4"][0] + }], + "pf_list_2_ipv4": [{ + "seqid": 10, + "action": "permit", + "network": NETWORK["ipv4"][1] + }] + }, + "ipv6": { + "pf_list_1_ipv6": [{ + "seqid": 100, + "action": "permit", + "network": NETWORK["ipv6"][0] + }], + "pf_list_2_ipv6": [{ + "seqid": 100, + "action": "permit", + "network": NETWORK["ipv6"][1] + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_6 = { + "r3": { + "route_maps": { + "rmap_match_tag_1_{}".format(addr_type): [{ + "action": "deny", + "match": { + addr_type: { + "prefix_lists": + "pf_list_1_{}".format(addr_type) + } + } + }], + "rmap_match_tag_2_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": + "pf_list_2_{}".format(addr_type) + } + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_6) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_7 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": + "rmap_match_tag_1_ipv4", + "direction": "in"}, + {"name": + "rmap_match_tag_1_ipv4", + "direction": "out"} + ] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": + "rmap_match_tag_1_ipv6", + "direction": "in"}, + {"name": + "rmap_match_tag_1_ipv6", + "direction": "out"} + ] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + for adt in ADDR_TYPES: + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + input_dict_2 = { + "r4": { + "static_routes": [ + { + "network": [NETWORK[adt][1]], + "no_of_ip": 9, + "next_hop": NEXT_HOP[adt] + } + ] + } + } + + result = verify_rib(tgen, adt, dut, input_dict_2, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n" + "Expected behavior: routes are not present in rib \n" + "Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + input_dict = { + "r1": { + "static_routes": [ + { + "network": [NETWORK[adt][0]], + "no_of_ip": 9, + "next_hop": NEXT_HOP[adt] + } + ] + } + } + result = verify_rib(tgen, adt, dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n " + "Expected behavior: routes are not present in rib \n " + "Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +@pytest.mark.parametrize("prefix_action, rmap_action", [("permit", "permit"), + ("permit", "deny"), ("deny", "permit"), + ("deny", "deny")]) +def test_route_map_with_action_values_combination_of_prefix_action_p0( + request, prefix_action, rmap_action): + """ + TC_36: + Test permit/deny statements operation in route-maps with a permutation and + combination of permit/deny in prefix-lists + """ + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don"t run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for adt in ADDR_TYPES: + # Create Static routes + input_dict = { + "r1": { + "static_routes": [ + { + "network": NETWORK[adt][0], + "no_of_ip": 9, + "next_hop": NEXT_HOP[adt] + } + ] + } + } + + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "local_as": 100, + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Permit in perfix list and route-map + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1_ipv4": [{ + "seqid": 10, + "network": "any", + "action": prefix_action + }] + }, + "ipv6": { + "pf_list_1_ipv6": [{ + "seqid": 100, + "network": "any", + "action": prefix_action + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": rmap_action, + "match": { + addr_type: { + "prefix_lists": + "pf_list_1_{}".format(addr_type) + } + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_7 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": + "rmap_match_pf_1_ipv4", + "direction": "in"} + ] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": + "rmap_match_pf_1_ipv6", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + dut = "r3" + protocol = "bgp" + input_dict_2 = { + "r1": { + "static_routes": [ + { + "network": [NETWORK[adt][0]], + "no_of_ip": 9, + "next_hop": NEXT_HOP[adt], + } + ] + } + } + + result = verify_rib(tgen, adt, dut, input_dict_2, protocol=protocol) + if "deny" in [prefix_action, rmap_action]: + assert result is not True, "Testcase {} : Failed \n Error: {}".\ + format(tc_name, result) + else: + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + +def test_route_map_multiple_seq_different_match_set_clause_p0(request): + """ + TC_35: + Test multiple sequence numbers in a single route-map for different + match/set clauses. + """ + + tgen = get_topogen() + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for adt in ADDR_TYPES: + # Create Static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": NETWORK[adt][0], + "no_of_ip": 1, + "next_hop": NEXT_HOP[adt] + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create ip prefix list + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1_ipv4": [{ + "seqid": 10, + "network": "any", + "action": "permit" + }] + }, + "ipv6": { + "pf_list_1_ipv6": [{ + "seqid": 100, + "network": "any", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [ + { + "action": "permit", + "match": { + addr_type: { + "prefix_lists": + "pf_list_2_{}".format(addr_type) + } + }, + "set": { + "aspath": { + "as_num": 500 + } + } + }, + { + "action": "permit", + "match": { + addr_type: { + "prefix_lists": + "pf_list_2_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + } + }, + { + "action": "permit", + "match": { + addr_type: { + "prefix_lists": + "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 50 + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": "in" + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": "out" + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": "in" + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": "out" + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + for adt in ADDR_TYPES: + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + input_dict = { + "r3": { + "route_maps": { + "rmap_match_pf_list1": [{ + "set": { + "med": 50, + } + }], + } + } + } + + static_routes = [NETWORK[adt][0]] + + time.sleep(2) + result = verify_bgp_attributes(tgen, adt, dut, static_routes, + "rmap_match_pf_list1", input_dict) + assert result is True, "Test case {} : Failed \n Error: {}". \ + format(tc_name, result) + + dut = "r4" + result = verify_bgp_attributes(tgen, adt, dut, static_routes, + "rmap_match_pf_list1", input_dict) + assert result is True, "Test case {} : Failed \n Error: {}". \ + format(tc_name, result) + + logger.info("Testcase " + tc_name + " :Passed \n") + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_route_map_set_only_no_match_p0(request): + """ + TC_37: + Test add/remove route-maps with multiple set + clauses and without any match statement.(Set only) + """ + + tgen = get_topogen() + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for adt in ADDR_TYPES: + # Create Static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": NETWORK[adt][0], + "no_of_ip": 1, + "next_hop": NEXT_HOP[adt] + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1": [ + { + "action": "permit", + "set": { + "med": 50, + "localpref": 150, + "weight": 4000 + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": "rmap_match_pf_1", + "direction": "in" + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": "rmap_match_pf_1", + "direction": "out" + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": "rmap_match_pf_1", + "direction": "in" + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": "rmap_match_pf_1", + "direction": "out" + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + time.sleep(2) + for adt in ADDR_TYPES: + input_dict_4 = { + "r3": { + "route_maps": { + "rmap_match_pf_1": [ + { + "action": "permit", + "set": { + "med": 50, + } + } + ] + } + } + } + # Verifying RIB routes + static_routes = [NETWORK[adt][0]] + result = verify_bgp_attributes(tgen, adt, "r3", static_routes, + "rmap_match_pf_1", input_dict_3) + assert result is True, "Test case {} : Failed \n Error: {}". \ + format(tc_name, result) + + result = verify_bgp_attributes(tgen, adt, "r4", static_routes, + "rmap_match_pf_1", input_dict_4) + assert result is True, "Test case {} : Failed \n Error: {}". \ + format(tc_name, result) + + logger.info("Testcase " + tc_name + " :Passed \n") + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_route_map_match_only_no_set_p0(request): + """ + TC_38: + Test add/remove route-maps with multiple match + clauses and without any set statement.(Match only) + """ + + tgen = get_topogen() + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for adt in ADDR_TYPES: + # Create Static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": NETWORK[adt][0], + "no_of_ip": 1, + "next_hop": NEXT_HOP[adt] + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create ip prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_list_1_ipv4": [{ + "seqid": 10, + "network": "any", + "action": "permit" + }] + }, + "ipv6": { + "pf_list_1_ipv6": [{ + "seqid": 100, + "network": "any", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r1": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + "set": { + "med": 50, + "localpref": 150, + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": "out" + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": "out" + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create ip prefix list + input_dict_5 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1_ipv4": [{ + "seqid": 10, + "network": "any", + "action": "permit" + }] + }, + "ipv6": { + "pf_list_1_ipv6": [{ + "seqid": 100, + "network": "any", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_5) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_6 = { + "r3": { + "route_maps": { + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": + "pf_list_1_{}".format(addr_type) + } + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_6) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_7 = { + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": "in" + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": "out" + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": "in" + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": "out" + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_7) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + for adt in ADDR_TYPES: + # Verifying RIB routes + static_routes = [NETWORK[adt][0]] + result = verify_bgp_attributes(tgen, adt, "r3", static_routes, + "rmap_match_pf_1", input_dict_3) + assert result is True, "Test case {} : Failed \n Error: {}". \ + format(tc_name, result) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) + + diff --git a/tests/topotests/bgp-route-map/test_route_map_topo2.py b/tests/topotests/bgp-route-map/test_route_map_topo2.py new file mode 100755 index 0000000000..7009fc97ce --- /dev/null +++ b/tests/topotests/bgp-route-map/test_route_map_topo2.py @@ -0,0 +1,3916 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +"""Following tests are covered to test route-map functionality. +TC_57: + Create route map to match prefix-list and permit inbound + and outbound prefixes and set criteria on match +TC_52: + Test modify set/match clauses in a route-map to see + if it takes immediate effect. +TC_61: + Delete the route maps. +TC_50_1: + Test modify/remove prefix-lists referenced by a + route-map for match statement. +TC_50_1: + Remove prefix-list referencec by route-map match cluase + and verifying it reflecting as intended +TC_51: + Add and remove community-list referencec by route-map match cluase + and verifying it reflecting as intended +TC_45: + Test multiple match statements as part of a route-map's single + sequence number. (Logical OR-ed of multiple match statements) +TC_44: + Test multiple match statements as part of a route-map's single + sequence number. (Logical AND of multiple match statements) +TC_41: + Test add/remove route-maps to specific neighbor and see if + it takes effect as intended +TC_56: + Test clear BGP sessions and interface flaps to see if + route-map properties are intact. +TC_46: + Verify if a blank sequence number can be create(without any + match/set clause) and check if it allows all the traffic/prefixes +TC_48: + Create route map setting local preference and weight to eBGP peeer + and metric to ibgp peer and verifying it should not get advertised +TC_43: + Test multiple set statements as part of a route-map's + single sequence number. +TC_54: + Verify route-maps continue clause functionality. +TC_55: + Verify route-maps goto clause functionality. +TC_53: + Verify route-maps call clause functionality. +TC_58: + Create route map deny inbound and outbound prefixes on + match prefix list and set criteria on match +TC_59: + Create route map to permit inbound prefixes with filter + match tag and set criteria +TC_60 + Create route map to deny outbound prefixes with filter match tag, + and set criteria +""" + +################################# +# TOPOLOGY +################################# +""" + + +-------+ + +--------- | R2 | + | +-------+ + |iBGP | + +-------+ | + | R1 | |iBGP + +-------+ | + | | + | iBGP +-------+ eBGP +-------+ + +---------- | R3 |----------| R4 | + +-------+ +-------+ + | + |eBGP + | + +-------+ + | R5 | + +-------+ + + +""" + +import sys +import json +import time +import pytest +import inspect +import os +from time import sleep + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, '../')) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib import topotest +from lib.topogen import Topogen, get_topogen +from mininet.topo import Topo + +# Required to instantiate the topology builder class. +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, create_static_routes, + verify_rib, delete_route_maps, create_bgp_community_lists, + interface_status, create_route_maps, create_prefix_lists, + verify_route_maps, check_address_types, verify_bgp_community, + shutdown_bringup_interface, verify_prefix_lists, reset_config_on_routers) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, create_router_bgp, + clear_bgp_and_verify, verify_bgp_attributes) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology and configuration creation +jsonFile = "{}/bgp_route_map_topo2.json".format(CWD) + +try: + with open(jsonFile, 'r') as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +# Global variables +bgp_convergence = False +NETWORK = { + "ipv4": ["11.0.20.1/32", "11.0.20.2/32"], + "ipv6": ["2::1/128", "2::2/128"] +} + +bgp_convergence = False +BGP_CONVERGENCE = False +ADDR_TYPES = check_address_types() + + +class BGPRmapTopo(Topo): + """BGPRmapTopo. + + BGPRmap topology 1 + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + """Build function.""" + tgen = get_topogen(self) + + # Building topology and configuration from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """setup_module. + + Set up the pytest environment + * `mod`: module name + """ + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("="*40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(BGPRmapTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Checking BGP convergence + global bgp_convergence + global ADDR_TYPES + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + bgp_convergence = verify_bgp_convergence(tgen, topo) + assert bgp_convergence is True, ('setup_module :Failed \n Error:' + ' {}'.format(bgp_convergence)) + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """teardown_module. + + Teardown the pytest environment. + * `mod`: module name + """ + logger.info("Running teardown_module to delete topology") + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info("Testsuite end time: {}".format( + time.asctime(time.localtime(time.time())))) + logger.info("="*40) + + +##################################################### +# Tests starting +##################################################### + + +def test_rmap_match_prefix_list_permit_in_and_outbound_prefixes_p0(): + """ + TC: 57 + Create route map to match prefix-list and permit inbound + and outbound prefixes and set criteria on match + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit', + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit', + }] + } + } + } + } + + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + for addr_type in ADDR_TYPES: + # Create route map + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_" + addr_type + } + }, + "set": { + "localpref": 150, + "weight": 100 + } + }, + ], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_" + addr_type + } + }, + "set": { + "med": 50 + } + }, + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + + # dual stack changes + for addr_type in ADDR_TYPES: + result4 = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result4 is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result4) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + # dual stack changes + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result4 = verify_bgp_attributes(tgen, addr_type, dut, routes[ + addr_type],rmap_name, input_dict_3) + assert result4 is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result4) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + # dual stack changes + for addr_type in ADDR_TYPES: + result4 = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result4 is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result4) + + # Verifying BGP set attributes + dut = 'r4' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + # dual stack changes + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_modify_set_match_clauses_in_rmap_p0(): + """ + TC_52: + Test modify set/match clauses in a route-map to see + if it takes immediate effect. + """ + + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit', + }], + 'pf_list_2_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit', + }], + 'pf_list_2_ipv6': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + } + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 50 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + # dual stack changes + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result4 = verify_bgp_attributes(tgen, addr_type, dut, routes[ + addr_type],rmap_name, input_dict_3) + assert result4 is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result4) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + # dual stack changes + for addr_type in ADDR_TYPES: + result4 = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result4 is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result4) + + # Verifying BGP set attributes + dut = 'r4' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[ + addr_type],rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Modify set/match clause of in-used route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 1000, + } + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 2000 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r4' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_delete_route_maps_p1(): + """ + TC_61: + Delete the route maps. + """ + + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_tag_1_{}".format(addr_type): [{ + "action": "deny", + "match": { + addr_type: { + "tag": "4001" + } + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Delete route maps + for addr_type in ADDR_TYPES: + input_dict = { + 'r3': { + 'route_maps': ['rmap_match_tag_1_{}'.format(addr_type)] + } + } + result = delete_route_maps(tgen, input_dict) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + result = verify_route_maps(tgen, input_dict) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_modify_prefix_list_referenced_by_rmap_p0(): + """ + TC_50_1: + Test modify/remove prefix-lists referenced by a + route-map for match statement. + """ + + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit', + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit', + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + "weight": 100 + } + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 50 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r4' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Modify ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'deny' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'deny' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + sleep(5) + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_remove_prefix_list_referenced_by_rmap_p0(): + """ + TC_50_1: + Remove prefix-list referencec by route-map match cluase + and verifying it reflecting as intended + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + } + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 50 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + for addr_type in ADDR_TYPES: + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r4' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Remove/Delete prefix list + input_dict_3 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit', + 'delete': True + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit', + 'delete': True + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + result = verify_prefix_lists(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Api call to clear bgp, so config changes would be reflected + dut = 'r3' + result = clear_bgp_and_verify(tgen, topo, dut) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_add_and_remove_community_list_referenced_by_rmap_p0(): + """ + TC_51: + Add and remove community-list referencec by route-map match cluase + and verifying it reflecting as intended + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Creating configuration from JSON + # build_config_from_json(tgen, topo) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_5 = { + "r1": { + "route_maps": { + "rm_r1_out_{}".format(addr_type): [{ + "action": "permit", + "set": { + "large_community": {"num": "1:1:1 1:2:3 2:1:1 2:2:2"} + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_5) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_6 = { + 'r1': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": "rm_r1_out_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": "rm_r1_out_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_6) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + for addr_type in ADDR_TYPES: + # Create standard large commumity-list + input_dict_1 = { + "r3": { + "bgp_community_lists": [ + { + "community_type": "standard", + "action": "permit", + "name": "rmap_lcomm_{}".format(addr_type), + "value": "1:1:1 1:2:3 2:1:1 2:2:2", + "large": True + } + ] + } + } + result = create_bgp_community_lists(tgen, input_dict_1) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + for addr_type in ADDR_TYPES: + # Create route map + input_dict_2 = { + "r3": { + "route_maps": { + "rm_r3_in_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type : { + "large-community-list": {"id": "rmap_lcomm_"+ + addr_type} + } + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_3 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": "rm_r3_in_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": "rm_r3_in_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_3) + + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + sleep(5) + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verify large-community-list + dut = 'r3' + networks = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + input_dict_4 = { + 'largeCommunity': '1:1:1 1:2:3 2:1:1 2:2:2' + } + for addr_type in ADDR_TYPES: + result = verify_bgp_community(tgen, addr_type, dut, networks[ + addr_type],input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_multiple_match_statement_in_route_map_logical_ORed_p0(): + """ + TC_45: + Test multiple match statements as part of a route-map's single + sequence number. (Logical OR-ed of multiple match statements) + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict_nw1 = { + 'r1': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + {"network": '10.0.30.1/32'} + ] + } + }, + "ipv6": { + "unicast": { + "advertise_networks": [ + {"network": '1::1/128'} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_nw1) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Api call to advertise networks + input_dict_nw2 = { + 'r1': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + {"network": '20.0.30.1/32'} + ] + } + }, + "ipv6": { + "unicast": { + "advertise_networks": [ + {"network": '2::1/128'} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_nw2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_2_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_2_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + input_dict_3_addr_type ={} + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150 + } + }] + } + } + } + input_dict_3_addr_type[addr_type] = input_dict_3 + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 200 + } + }] + } + } + } + input_dict_3_addr_type[addr_type] = input_dict_3 + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_6 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_6) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.30.1/32"], + "ipv6": ["1::1/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3_addr_type[addr_type]) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + routes = { + "ipv4": ["20.0.30.1/32"], + "ipv6": ["2::1/128"] + } + for addr_type in ADDR_TYPES: + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_multiple_match_statement_in_route_map_logical_ANDed(): + """ + TC_44: + Test multiple match statements as part of a route-map's single + sequence number. (Logical AND of multiple match statements) + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_5 = { + "r1": { + "route_maps": { + "rm_r1_out_{}".format(addr_type): [{ + "action": "permit", + "set": { + "large_community": { + "num": "1:1:1 1:2:3 2:1:1 2:2:2"} + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_5) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + for addr_type in ADDR_TYPES: + input_dict_6 = { + 'r1': { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rm_r1_out_{}".format(addr_type), + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_6) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + for addr_type in ADDR_TYPES: + # Create standard large commumity-list + input_dict_1 = { + "r3": { + "bgp_community_lists": [ + { + "community_type": "standard", + "action": "permit", + "name": "rmap_lcomm_{}".format(addr_type), + "value": "1:1:1 1:2:3 2:1:1 2:2:2", + "large": True + } + ] + } + } + result = create_bgp_community_lists(tgen, input_dict_1) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + for addr_type in ADDR_TYPES: + # Create route map + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type : { + "large_community_list": {"id": "rmap_lcomm_"+ + addr_type} + } + }, + "set": { + "localpref": 150, + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + # Configure neighbor for route map + for addr_type in ADDR_TYPES: + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_{}".format(addr_type), + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + # sleep(10) + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_add_remove_rmap_to_specific_neighbor_p0(): + """ + TC_41: + Test add/remove route-maps to specific neighbor and see if + it takes effect as intended + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'deny' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'deny' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : \n' + 'Expected Behavior: Routes are not present in RIB \n' + ' Error: {}'.format( + tc_name, result) + + # Remove applied rmap from neighbor + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in', + "delete": True + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in', + "delete": True + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_clear_bgp_and_flap_interface_to_verify_rmap_properties_p0(): + """ + TC_56: + Test clear BGP sessions and interface flaps to see if + route-map properties are intact. + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + "weight": 100 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # clear bgp, so config changes would be reflected + dut = 'r3' + result = clear_bgp_and_verify(tgen, topo, dut) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Flap interface to see if route-map properties are intact + # Shutdown interface + dut = "r3" + intf = "r3-r1-eth0" + shutdown_bringup_interface(tgen, dut, intf, False) + + sleep(5) + + # Bringup interface + dut = "r3" + intf = "r3-r1-eth0" + shutdown_bringup_interface(tgen, dut, intf, True) + + # Verify BGP convergence once interface is up + result = verify_bgp_convergence(tgen, topo) + assert result is True, ( + 'setup_module :Failed \n Error:' ' {}'.format(result)) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_rmap_without_match_and_set_clause_p0(): + """ + TC_46: + Verify if a blank sequence number can be create(without any + match/set clause) and check if it allows all the traffic/prefixes + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_no_match_set_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '5' + }], + "rmap_no_match_set_2_{}".format(addr_type): [{ + "action": "deny", + 'seq_id': '5' + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_no_match_set_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_no_match_set_2_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_no_match_set_1_ipv6", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_no_match_set_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_set_localpref_weight_to_ebgp_and_med_to_ibgp_peers_p0(): + """ + TC_48: + Create route map setting local preference and weight to eBGP peeer + and metric to ibgp peer and verifying it should not get advertised + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + input_dict_3_addr_type ={} + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format( + addr_type) + } + }, + "set": { + "med": 50 + } + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format( + addr_type) + }}, + "set": { + "localpref": 150 + } + }], + "rmap_match_pf_3_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format( + addr_type) + }}, + "set": { + "weight": 1000 + } + }] + } + } + } + input_dict_3_addr_type[addr_type] = input_dict_3 + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv4", + "direction": 'out' + }] + } + } + }, + "r5": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_3_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + }, + "r5": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_3_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + rmap_name = "rmap_match_pf_1" + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[ + addr_type],rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r4' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + rmap_name = "rmap_match_pf_2" + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3_addr_type[addr_type]) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: Attributes are not set \n' + 'Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r5' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + # Verifying BGP set attributes + dut = 'r5' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + + rmap_name = "rmap_match_pf_3" + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_3_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3_addr_type[addr_type]) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: Attributes are not set \n' + 'Error: {}'.format( + tc_name, result) + + logger.info("Expected behaviour: {}".format(result)) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_multiple_set_on_single_sequence_in_rmap_p0(): + """ + TC_43: + Test multiple set statements as part of a route-map's + single sequence number. + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + "weight": 100, + "med": 50 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + + rmap_name = "rmap_match_pf_1" + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_route_maps_with_continue_clause_p0(): + """ + TC_54: + Verify route-maps continue clause functionality. + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '10', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150 + }, + "continue": "30" + }, + { + "action": "permit", + 'seq_id': '20', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 200 + } + }, + { + "action": "permit", + 'seq_id': '30', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 100 + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + rmap_name = "rmap_match_pf_1" + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + seq_id = { + "ipv4": ["10", "30"], + "ipv6": ["10", "30"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[ + addr_type],rmap_name, input_dict_3, seq_id[addr_type]) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_route_maps_with_goto_clause_p0(): + """ + TC_55: + Verify route-maps goto clause functionality. + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + 'seq_id': '10', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "goto": "30" + }, + { + "action": "permit", + 'seq_id': '20', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 100 + } + }, + { + "action": "permit", + 'seq_id': '30', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 200 + } + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + rmap_name = "rmap_match_pf_1" + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + seq_id = { + "ipv4": ["10", "30"], + "ipv6": ["10", "30"] + } + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[ + addr_type],rmap_name, input_dict_3, seq_id[addr_type]) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_route_maps_with_call_clause_p0(): + """ + TC_53: + Verify route-maps call clause functionality. + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150 + }, + "call": "rmap_match_pf_2_{}".format(addr_type) + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 200 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv6", + "direction": 'in' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying BGP set attributes + dut = 'r3' + routes = { + "ipv4": ["10.0.20.1/32", "10.0.20.2/32"], + "ipv6": ["1::1/128", "1::2/128"] + } + rmap_name = "rmap_match_pf_1" + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_1_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + rmap_name = "rmap_match_pf_2" + for addr_type in ADDR_TYPES: + rmap_name = "rmap_match_pf_2_{}".format(addr_type) + result = verify_bgp_attributes(tgen, addr_type, dut, routes[addr_type], + rmap_name, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_create_rmap_match_prefix_list_to_deny_in_and_outbound_prefixes_p0(): + """ + TC_58: + Create route map deny inbound and outbound prefixes on + match prefix list and set criteria on match + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + 'r3': { + 'prefix_lists': { + 'ipv4': { + 'pf_list_1_ipv4': [{ + 'seqid': 10, + 'network': 'any', + 'action': 'permit' + }] + }, + 'ipv6': { + 'pf_list_1_ipv6': [{ + 'seqid': 100, + 'network': 'any', + 'action': 'permit' + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Create route map + for addr_type in ADDR_TYPES: + input_dict_3 = { + "r3": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [{ + "action": "deny", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "localpref": 150, + } + }], + "rmap_match_pf_2_{}".format(addr_type): [{ + "action": "deny", + "match": { + addr_type: { + "prefix_lists": "pf_list_1_{}".format(addr_type) + } + }, + "set": { + "med": 50 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r3': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_1_ipv4", + "direction": 'in' + }] + } + } + }, + "r4": { + "dest_link": { + "r3": { + "route_maps": [{ + "name": + "rmap_match_pf_2_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + input_dict = topo["routers"] + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r4' + protocol = 'bgp' + for addr_type in ADDR_TYPES: + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behaviour: routes are not present \n ' + 'Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_create_rmap_to_match_tag_permit_inbound_prefixes_p0(): + """ + TC_59: + Create route map to permit inbound prefixes with filter + match tag and set criteria + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + for addr_type in ADDR_TYPES: + # Create Static routes + input_dict = { + "r1": { + "static_routes": [ + { + "network": NETWORK[addr_type], + "next_hop": "Null0", + "tag": 4001 + } + ] + } + } + + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "local_as": 100, + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r1": { + "route_maps": { + "rmap_match_tag_1_{}".format(addr_type): [{ + "action": "permit", + "match": { + addr_type: { + "tag": "4001" + } + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r1': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rmap_match_tag_1_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rmap_match_tag_1_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + + for addr_type in ADDR_TYPES: + input_dict = { + "r1": { + "static_routes": [ + { + "network": NETWORK[addr_type], + "next_hop": "Null0", + "tag": 4001 + } + ] + } + } + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_create_rmap_to_match_tag_deny_outbound_prefixes_p0(): + """ + TC_60 + Create route map to deny outbound prefixes with filter match tag, + and set criteria + """ + tgen = get_topogen() + global bgp_convergence + + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = inspect.stack()[0][3] + write_test_header(tc_name) + reset_config_on_routers(tgen) + + for addr_type in ADDR_TYPES: + # Create Static routes + input_dict = { + "r1": { + "static_routes": [ + { + "network": NETWORK[addr_type], + "next_hop": "Null0", + "tag": 4001 + } + ] + } + } + + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "local_as": 100, + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r1": { + "route_maps": { + "rmap_match_tag_1_{}".format(addr_type): [{ + "action": "deny", + "match": { + addr_type: { + "tag": "4001" + } + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + 'r1': { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rmap_match_tag_1_ipv4", + "direction": 'out' + }] + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "route_maps": [{ + "name": + "rmap_match_tag_1_ipv6", + "direction": 'out' + }] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, 'Testcase {} : Failed \n Error: {}'.format( + tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + + for addr_type in ADDR_TYPES: + input_dict = { + "r1": { + "static_routes": [ + { + "network": NETWORK[addr_type], + "next_hop": "Null0", + "tag": 4001 + } + ] + } + } + result = verify_rib(tgen, addr_type, dut, input_dict, + protocol=protocol) + assert result is not True, 'Testcase {} : Failed \n' + 'Expected behavior: routes are denied \n Error: {}'.format( + tc_name, result) + + write_test_footer(tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + +if __name__ == '__main__': + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index c05e14a95e..bfc34c25e4 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -1229,6 +1229,150 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): return True +@retry(attempts=3, wait=4, return_is_str=True) +def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name, + input_dict, seq_id=None): + """ + API will verify BGP attributes set by Route-map for given prefix and + DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command + in DUT to verify BGP attributes set by route-map, Set attributes + values will be read from input_dict and verified with command output. + + * `tgen`: topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `static_routes`: Static Routes for which BGP set attributes needs to be + verified + * `rmap_name`: route map name for which set criteria needs to be verified + * `input_dict`: defines for which router, AS numbers needs + * `seq_id`: sequence number of rmap, default is None + + Usage + ----- + input_dict = { + "r3": { + "route_maps": { + "rmap_match_pf_1_ipv4": [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_" + addr_type + } + }, + "set": { + "localpref": 150, + "weight": 100 + } + }], + "rmap_match_pf_2_ipv6": [{ + "action": "permit", + 'seq_id': '5', + "match": { + addr_type: { + "prefix_lists": "pf_list_1_" + addr_type + } + }, + "set": { + "med": 50 + } + }] + } + } + } + result = verify_bgp_attributes(tgen, 'ipv4', "r1", "10.0.20.1/32", + rmap_match_pf_1_ipv4, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_bgp_attributes()") + for router, rnode in tgen.routers().iteritems(): + if router != dut: + continue + + logger.info('Verifying BGP set attributes for dut {}:'.format(router)) + + for static_route in static_routes: + cmd = "show bgp {} {} json".format(addr_type, static_route) + show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True) + print("show_bgp_json $$$$$", show_bgp_json) + + dict_to_test = [] + tmp_list = [] + for rmap_router in input_dict.keys(): + for rmap, values in input_dict[rmap_router][ + "route_maps"].items(): + print("rmap == rmap_name $$$$1", rmap, rmap_name) + if rmap == rmap_name: + print("rmap == rmap_name $$$$", rmap, rmap_name) + dict_to_test = values + for rmap_dict in values: + if seq_id is not None: + if type(seq_id) is not list: + seq_id = [seq_id] + + if "seq_id" in rmap_dict: + rmap_seq_id = \ + rmap_dict["seq_id"] + for _seq_id in seq_id: + if _seq_id == rmap_seq_id: + tmp_list.append(rmap_dict) + if tmp_list: + dict_to_test = tmp_list + + print("dict_to_test $$$$", dict_to_test) + for rmap_dict in dict_to_test: + if "set" in rmap_dict: + for criteria in rmap_dict["set"].keys(): + if criteria not in show_bgp_json[ + "paths"][0]: + errormsg = ("BGP attribute: {}" + " is not found in" + " cli: {} output " + "in router {}". + format(criteria, + cmd, + router)) + return errormsg + + if rmap_dict["set"][criteria] == \ + show_bgp_json["paths"][0][ + criteria]: + logger.info("Verifying BGP " + "attribute {} for" + " route: {} in " + "router: {}, found" + " expected value:" + " {}". + format(criteria, + static_route, + dut, + rmap_dict[ + "set"][ + criteria])) + else: + errormsg = \ + ("Failed: Verifying BGP " + "attribute {} for route:" + " {} in router: {}, " + " expected value: {} but" + " found: {}". + format(criteria, + static_route, + dut, + rmap_dict["set"] + [criteria], + show_bgp_json[ + 'paths'][ + 0][criteria])) + return errormsg + + logger.debug("Exiting lib API: verify_bgp_attributes()") + return True + @retry(attempts=3, wait=2, return_is_str=True) def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, attribute): diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index f2d33f94ae..ba8a4cb0f4 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -21,6 +21,7 @@ from collections import OrderedDict from datetime import datetime from time import sleep +from copy import deepcopy from subprocess import call from subprocess import STDOUT as SUB_STDOUT from subprocess import PIPE as SUB_PIPE @@ -208,6 +209,7 @@ def create_common_configuration(tgen, router, data, config_type=None, "interface_config": "! Interfaces Config\n", "static_route": "! Static Route Config\n", "prefix_list": "! Prefix List Config\n", + "bgp_community_list": "! Community List Config\n", "route_maps": "! Route Maps Config\n", "bgp": "! BGP Config\n" }) @@ -547,13 +549,11 @@ def generate_ips(network, no_of_ips): Returns list of IPs. based on start_ip and no_of_ips - * `network` : from here the ip will start generating, start_ip will be - first ip + * `network` : from here the ip will start generating, + start_ip will be * `no_of_ips` : these many IPs will be generated - - Limitation: It will generate IPs only for ip_mask 32 - """ + ipaddress_list = [] if type(network) is not list: network = [network] @@ -893,6 +893,7 @@ def create_static_routes(tgen, input_dict, build=False): """ result = False logger.debug("Entering lib API: create_static_routes()") + input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): if "static_routes" not in input_dict[router]: @@ -918,16 +919,21 @@ def create_static_routes(tgen, input_dict, build=False): next_hop = static_route["next_hop"] network = static_route["network"] - ip_list = generate_ips([network], no_of_ip) + if type(network) is not list: + network = [network] + + ip_list = generate_ips(network, no_of_ip) for ip in ip_list: addr_type = validate_ip_address(ip) + if addr_type == "ipv4": cmd = "ip route {} {}".format(ip, next_hop) else: cmd = "ipv6 route {} {}".format(ip, next_hop) if tag: - cmd = "{} {}".format(cmd, str(tag)) + cmd = "{} tag {}".format(cmd, str(tag)) + if admin_distance: cmd = "{} {}".format(cmd, admin_distance) @@ -1112,11 +1118,11 @@ def create_route_maps(tgen, input_dict, build=False): "prefix_list": "pf_list_1" } - "large-community-list": "{ + "large-community-list": { "id": "community_1", "exact_match": True } - "community": { + "community_list": { "id": "community_2", "exact_match": True } @@ -1152,12 +1158,11 @@ def create_route_maps(tgen, input_dict, build=False): result = False logger.debug("Entering lib API: create_route_maps()") - + input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): if "route_maps" not in input_dict[router]: - errormsg = "route_maps not present in input_dict" - logger.debug(errormsg) + logger.debug("route_maps not present in input_dict") continue rmap_data = [] for rmap_name, rmap_value in \ @@ -1187,10 +1192,41 @@ def create_route_maps(tgen, input_dict, build=False): rmap_name, rmap_action, seq_id )) + if "continue" in rmap_dict: + continue_to = rmap_dict["continue"] + if continue_to: + rmap_data.append("on-match goto {}". + format(continue_to)) + else: + logger.error("In continue, 'route-map entry " + "sequence number' is not provided") + return False + + if "goto" in rmap_dict: + go_to = rmap_dict["goto"] + if go_to: + rmap_data.append("on-match goto {}". + format(go_to)) + else: + logger.error("In goto, 'Goto Clause number' is not" + " provided") + return False + + if "call" in rmap_dict: + call_rmap = rmap_dict["call"] + if call_rmap: + rmap_data.append("call {}". + format(call_rmap)) + else: + logger.error("In call, 'destination Route-Map' is" + " not provided") + return False + # Verifying if SET criteria is defined if "set" in rmap_dict: set_data = rmap_dict["set"] - + ipv4_data = set_data.setdefault("ipv4", {}) + ipv6_data = set_data.setdefault("ipv6", {}) local_preference = set_data.setdefault("localpref", None) metric = set_data.setdefault("med", None) @@ -1199,7 +1235,10 @@ def create_route_maps(tgen, input_dict, build=False): community = set_data.setdefault("community", {}) large_community = set_data.setdefault( "large_community", {}) + large_comm_list = set_data.setdefault( + "large_comm_list", {}) set_action = set_data.setdefault("set_action", None) + nexthop = set_data.setdefault("nexthop", None) # Local Preference if local_preference: @@ -1243,42 +1282,84 @@ def create_route_maps(tgen, input_dict, build=False): rmap_data.append(cmd) else: - logger.errror("In large_community, AS Num not" - " provided") + logger.error("In large_community, AS Num not" + " provided") + return False + if large_comm_list: + id = large_comm_list.setdefault("id", None) + del_comm = large_comm_list.setdefault("delete", + None) + if id: + cmd = "set large-comm-list {}".format(id) + if del_comm: + cmd = "{} delete".format(cmd) + + rmap_data.append(cmd) + else: + logger.error("In large_comm_list 'id' not" + " provided") return False # Weight if weight: - rmap_data.append("set weight {}".format( + rmap_data.append("set weight {} \n".format( weight)) - + if ipv6_data: + nexthop = ipv6_data.setdefault("nexthop",None) + if nexthop: + rmap_data.append("set ipv6 next-hop \ + {}".format(nexthop)) # Adding MATCH and SET sequence to RMAP if defined if "match" in rmap_dict: match_data = rmap_dict["match"] ipv4_data = match_data.setdefault("ipv4", {}) ipv6_data = match_data.setdefault("ipv6", {}) - community = match_data.setdefault("community-list", - {}) + community = match_data.setdefault( + "community_list",{}) large_community = match_data.setdefault( - "large-community-list", {} + "large_community", {} + ) + large_community_list = match_data.setdefault( + "large_community_list", {} ) - tag = match_data.setdefault("tag", None) if ipv4_data: - prefix_name = ipv4_data.setdefault("prefix_lists", - None) + # fetch prefix list data from rmap + prefix_name = \ + ipv4_data.setdefault("prefix_lists", + None) if prefix_name: - rmap_data.append("match ip address prefix-list" - " {}".format(prefix_name)) + rmap_data.append("match ip address" + " prefix-list {}".format(prefix_name)) + + # fetch tag data from rmap + tag = ipv4_data.setdefault("tag", None) + if tag: + rmap_data.append("match tag {}".format(tag)) + + # fetch large community data from rmap + large_community_list = ipv4_data.setdefault( + "large_community_list",{}) + large_community = match_data.setdefault( + "large_community", {}) + if ipv6_data: prefix_name = ipv6_data.setdefault("prefix_lists", None) if prefix_name: - rmap_data.append("match ipv6 address " - "prefix-list {}". - format(prefix_name)) - if tag: - rmap_data.append("match tag {}".format(tag)) + rmap_data.append("match ipv6 address" + " prefix-list {}".format(prefix_name)) + + # fetch tag data from rmap + tag = ipv6_data.setdefault("tag", None) + if tag: + rmap_data.append("match tag {}".format(tag)) + + # fetch large community data from rmap + large_community_list = ipv6_data.setdefault( + "large_community_list",{}) + large_community = match_data.setdefault( + "large_community", {}) if community: if "id" not in community: @@ -1293,10 +1374,9 @@ def create_route_maps(tgen, input_dict, build=False): cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) - if large_community: if "id" not in large_community: - logger.error("'num' is mandatory for " + logger.error("'id' is mandatory for " "large-community-list in match " "criteria") return False @@ -1306,7 +1386,19 @@ def create_route_maps(tgen, input_dict, build=False): "exact_match", False) if exact_match: cmd = "{} exact-match".format(cmd) - + rmap_data.append(cmd) + if large_community_list: + if "id" not in large_community_list: + logger.error("'id' is mandatory for " + "large-community-list in match " + "criteria") + return False + cmd = "match large-community {}".format( + large_community_list["id"]) + exact_match = large_community_list.setdefault( + "exact_match", False) + if exact_match: + cmd = "{} exact-match".format(cmd) rmap_data.append(cmd) result = create_common_configuration(tgen, router, @@ -1320,10 +1412,178 @@ def create_route_maps(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_prefix_lists()") + logger.debug("Exiting lib API: create_route_maps()") + return result + + +def delete_route_maps(tgen, input_dict): + """ + Delete ip route maps from device + + * `tgen` : Topogen object + * `input_dict` : for which router, + route map has to be deleted + + Usage + ----- + # Delete route-map rmap_1 and rmap_2 from router r1 + input_dict = { + "r1": { + "route_maps": ["rmap_1", "rmap__2"] + } + } + result = delete_route_maps("ipv4", input_dict) + + Returns + ------- + errormsg(str) or True + """ + logger.info("Entering lib API: delete_route_maps()") + + for router in input_dict.keys(): + route_maps = input_dict[router]["route_maps"][:] + rmap_data = input_dict[router] + rmap_data["route_maps"] = {} + for route_map_name in route_maps: + rmap_data["route_maps"].update({ + route_map_name: + [{ + "delete": True + }] + }) + + return create_route_maps(tgen, input_dict) + + +def create_bgp_community_lists(tgen, input_dict, build=False): + """ + Create bgp community-list or large-community-list on the devices as per + the arguments passed. Takes list of communities in input. + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + Usage + ----- + input_dict_1 = { + "r3": { + "bgp_community_lists": [ + { + "community_type": "standard", + "action": "permit", + "name": "rmap_lcomm_{}".format(addr_type), + "value": "1:1:1 1:2:3 2:1:1 2:2:2", + "large": True + } + ] + } + } + } + result = create_bgp_community_lists(tgen, input_dict_1) + """ + + result = False + logger.debug("Entering lib API: create_bgp_community_lists()") + input_dict = deepcopy(input_dict) + try: + for router in input_dict.keys(): + if "bgp_community_lists" not in input_dict[router]: + errormsg = "bgp_community_lists not present in input_dict" + logger.debug(errormsg) + continue + + config_data = [] + + community_list = input_dict[router]["bgp_community_lists"] + for community_dict in community_list: + del_action = community_dict.setdefault("delete", False) + community_type = community_dict.setdefault("community_type", + None) + action = community_dict.setdefault("action", None) + value = community_dict.setdefault("value", '') + large = community_dict.setdefault("large", None) + name = community_dict.setdefault("name", None) + if large: + cmd = "bgp large-community-list" + else: + cmd = "bgp community-list" + + if not large and not (community_type and action and value): + errormsg = "community_type, action and value are " \ + "required in bgp_community_list" + logger.error(errormsg) + return False + + try: + community_type = int(community_type) + cmd = "{} {} {} {}".format(cmd, community_type, action, + value) + except ValueError: + + cmd = "{} {} {} {} {}".format( + cmd, community_type, name, action, value) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + result = create_common_configuration(tgen, router, config_data, + "bgp_community_list", + build=build) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_bgp_community_lists()") return result +def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False): + """ + Shutdown or bringup router's interface " + + * `tgen` : Topogen object + * `dut` : Device under test + * `intf_name` : Interface name to be shut/no shut + * `ifaceaction` : Action, to shut/no shut interface, + by default is False + + Usage + ----- + dut = "r3" + intf = "r3-r1-eth0" + # Shut down ineterface + shutdown_bringup_interface(tgen, dut, intf, False) + + # Bring up ineterface + shutdown_bringup_interface(tgen, dut, intf, True) + + Returns + ------- + errormsg(str) or True + """ + + from topotest import interface_set_status + router_list = tgen.routers() + if ifaceaction: + logger.info("Bringing up interface : {}".format(intf_name)) + else: + logger.info("Shutting down interface : {}".format(intf_name)) + + interface_set_status(router_list[dut], intf_name, + ifaceaction) + + if ifaceaction: + # Disabling v6 link local once interfac comes up back + disable_v6_link_local(tgen, dut, intf_name=intf_name) + + ############################################# # Verification APIs ############################################# @@ -1625,5 +1885,133 @@ def verify_prefix_lists(tgen, input_dict): logger.info("Prefix list %s is/are not present in the router" " from router %s", prefix_list, router) - logger.debug("Exiting lib API: verify_prefix_lissts()") + logger.debug("Exiting lib API: verify_prefix_lists()") + return True + + +@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) +def verify_route_maps(tgen, input_dict): + """ + Running "show route-map" command and verifying given route-map + is present in router. + Parameters + ---------- + * `tgen` : topogen object + * `input_dict`: data to verify prefix lists + Usage + ----- + # To verify rmap_1 and rmap_2 are present in router r1 + input_dict = { + "r1": { + "route_maps": ["rmap_1", "rmap_2"] + } + } + result = verify_route_maps(tgen, input_dict) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_route_maps()") + + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + # Show ip route-map + show_route_maps = rnode.vtysh_cmd("show route-map") + + # Verify route-map is deleted + route_maps = input_dict[router]["route_maps"] + for route_map in route_maps: + if route_map in show_route_maps: + errormsg = ("Route map {} is not deleted from router" + " {}".format(route_map, router)) + return errormsg + + logger.info("Route map %s is/are deleted successfully from" + " router %s", route_maps, router) + + logger.debug("Exiting lib API: verify_route_maps()") + return True + + +@retry(attempts=3, wait=4, return_is_str=True) +def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): + """ + API to veiryf BGP large community is attached in route for any given + DUT by running "show bgp ipv4/6 {route address} json" command. + + Parameters + ---------- + * `tgen`: topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `network`: network for which set criteria needs to be verified + * `input_dict`: having details like - for which router, community and + values needs to be verified + Usage + ----- + networks = ["200.50.2.0/32"] + input_dict = { + "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5" + } + result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_bgp_community()") + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + logger.debug("Verifying BGP community attributes on dut %s: for %s " + "network %s", router, addr_type, network) + + for net in network: + cmd = "show bgp {} {} json".format(addr_type, net) + show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True) + logger.info(show_bgp_json) + if "paths" not in show_bgp_json: + return "Prefix {} not found in BGP table of router: {}". \ + format(net, router) + + as_paths = show_bgp_json["paths"] + found = False + for i in range(len(as_paths)): + if "largeCommunity" in show_bgp_json["paths"][i] or \ + "community" in show_bgp_json["paths"][i]: + found = True + logger.info("Large Community attribute is found for route:" + " %s in router: %s", net, router) + if input_dict is not None: + for criteria, comm_val in input_dict.items(): + show_val = show_bgp_json["paths"][i][criteria][ + "string"] + if comm_val == show_val: + logger.info("Verifying BGP %s for prefix: %s" + " in router: %s, found expected" + " value: %s", criteria, net, router, + comm_val) + else: + errormsg = "Failed: Verifying BGP attribute" \ + " {} for route: {} in router: {}" \ + ", expected value: {} but found" \ + ": {}".format( + criteria, net, router, comm_val, + show_val) + return errormsg + + if not found: + errormsg = ( + "Large Community attribute is not found for route: " + "{} in router: {} ".format(net, router)) + return errormsg + + logger.debug("Exiting lib API: verify_bgp_community()") return True diff --git a/tests/topotests/lib/topojson.py b/tests/topotests/lib/topojson.py index 7a00fe4c50..fff5a1e82f 100644 --- a/tests/topotests/lib/topojson.py +++ b/tests/topotests/lib/topojson.py @@ -35,6 +35,7 @@ from lib.common_config import ( create_static_routes, create_prefix_lists, create_route_maps, + create_bgp_community_lists ) from lib.bgp import create_router_bgp @@ -179,6 +180,7 @@ def build_config_from_json(tgen, topo, save_bkup=True): ("links", create_interfaces_cfg), ("static_routes", create_static_routes), ("prefix_lists", create_prefix_lists), + ("bgp_community_list", create_bgp_community_lists), ("route_maps", create_route_maps), ("bgp", create_router_bgp) ]) |
