]> git.puffer.fish Git - matthieu/frr.git/commitdiff
tests: Adding new test suite bgp_communities_topo1
authorKuldeep Kashyap <kashyapk@vmware.com>
Wed, 1 Apr 2020 06:56:54 +0000 (06:56 +0000)
committerKuldeep Kashyap <kashyapk@vmware.com>
Wed, 8 Apr 2020 12:27:23 +0000 (12:27 +0000)
1. Added 1 test case to verify NO-ADVERTISE Community functionality
2. Enhanced bgp.py to exclude routers from verification, if doesn't have bgp config

Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
tests/topotests/bgp_communities_topo1/bgp_communities.json [new file with mode: 0644]
tests/topotests/bgp_communities_topo1/test_bgp_communities.py [new file with mode: 0644]
tests/topotests/lib/bgp.py
tests/topotests/lib/common_config.py

diff --git a/tests/topotests/bgp_communities_topo1/bgp_communities.json b/tests/topotests/bgp_communities_topo1/bgp_communities.json
new file mode 100644 (file)
index 0000000..da6aec2
--- /dev/null
@@ -0,0 +1,175 @@
+{
+    "address_types": [
+        "ipv4",
+        "ipv6"
+    ],
+    "ipv4base": "10.0.0.0",
+    "ipv4mask": 30,
+    "ipv6base": "fd00::",
+    "ipv6mask": 64,
+    "link_ip_start": {
+        "ipv4": "10.0.0.0",
+        "v4mask": 30,
+        "ipv6": "fd00::",
+        "v6mask": 64
+    },
+    "lo_prefix": {
+        "ipv4": "1.0.",
+        "v4mask": 32,
+        "ipv6": "2001:db8:f::",
+        "v6mask": 128
+    },
+    "routers": {
+        "r1": {
+            "links": {
+                "lo": {
+                    "ipv4": "auto",
+                    "ipv6": "auto",
+                    "type": "loopback"
+                },
+                "r2": {
+                    "ipv4": "auto",
+                    "ipv6": "auto"
+                },
+                "r0": {
+                    "ipv4": "auto",
+                    "ipv6": "auto"
+                }
+            },
+            "bgp": {
+                "local_as": "100",
+                "address_family": {
+                    "ipv4": {
+                        "unicast": {
+                            "neighbor": {
+                                "r2": {
+                                    "dest_link": {
+                                        "r1": {}
+                                    }
+                                }
+                            }
+                        }
+                    },
+                    "ipv6": {
+                        "unicast": {
+                            "neighbor": {
+                                "r2": {
+                                    "dest_link": {
+                                        "r1": {}
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "r2": {
+            "links": {
+                "lo": {
+                    "ipv4": "auto",
+                    "ipv6": "auto",
+                    "type": "loopback"
+                },
+                "r1": {
+                    "ipv4": "auto",
+                    "ipv6": "auto"
+                },
+                "r3": {
+                    "ipv4": "auto",
+                    "ipv6": "auto"
+                }
+            },
+            "bgp": {
+                "local_as": "200",
+                "address_family": {
+                    "ipv4": {
+                        "unicast": {
+                            "neighbor": {
+                                "r1": {
+                                    "dest_link": {
+                                        "r2": {}
+                                    }
+                                },
+                                "r3": {
+                                    "dest_link": {
+                                        "r2": {}
+                                    }
+                                }
+                            }
+                        }
+                    },
+                    "ipv6": {
+                        "unicast": {
+                            "neighbor": {
+                                "r1": {
+                                    "dest_link": {
+                                        "r2": {}
+                                    }
+                                },
+                                "r3": {
+                                    "dest_link": {
+                                        "r2": {}
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "r3": {
+            "links": {
+                "lo": {
+                    "ipv4": "auto",
+                    "ipv6": "auto",
+                    "type": "loopback"
+                },
+                "r2": {
+                    "ipv4": "auto",
+                    "ipv6": "auto"
+                }
+            },
+            "bgp": {
+                "local_as": "300",
+                "address_family": {
+                    "ipv4": {
+                        "unicast": {
+                            "neighbor": {
+                                "r2": {
+                                    "dest_link": {
+                                        "r3": {}
+                                    }
+                                }
+                            }
+                        }
+                    },
+                    "ipv6": {
+                        "unicast": {
+                            "neighbor": {
+                                "r2": {
+                                    "dest_link": {
+                                        "r3": {}
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "r0": {
+            "links": {
+                "lo": {
+                    "ipv4": "auto",
+                    "ipv6": "auto",
+                    "type": "loopback"
+                },
+                "r1": {
+                    "ipv4": "auto",
+                    "ipv6": "auto"
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/tests/topotests/bgp_communities_topo1/test_bgp_communities.py b/tests/topotests/bgp_communities_topo1/test_bgp_communities.py
new file mode 100644 (file)
index 0000000..7d960d6
--- /dev/null
@@ -0,0 +1,635 @@
+#!/usr/bin/python
+
+#
+# Copyright (c) 2020 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test bgp community functionality:
+- Verify routes are not advertised when NO-ADVERTISE Community is applied
+
+"""
+
+import os
+import sys
+import time
+import json
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from mininet.topo import Topo
+from lib.topogen import Topogen, get_topogen
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+    start_topology,
+    write_test_header,
+    write_test_footer,
+    reset_config_on_routers,
+    verify_rib,
+    create_static_routes,
+    check_address_types,
+    step,
+    create_route_maps,
+    create_prefix_lists,
+    create_route_maps,
+)
+from lib.topolog import logger
+from lib.bgp import (
+    verify_bgp_convergence,
+    create_router_bgp,
+    clear_bgp_and_verify,
+    verify_bgp_rib,
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+from copy import deepcopy
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_communities.json".format(CWD)
+try:
+    with open(jsonFile, "r") as topoJson:
+        topo = json.load(topoJson)
+except IOError:
+    assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+BGP_CONVERGENCE = False
+ADDR_TYPES = check_address_types()
+NETWORK = {"ipv4": "2.2.2.2/32", "ipv6": "22:22::2/128"}
+NEXT_HOP_IP = {}
+
+
+class BGPCOMMUNITIES(Topo):
+    """
+    Test BGPCOMMUNITIES - topology 1
+
+    * `Topo`: Topology object
+    """
+
+    def build(self, *_args, **_opts):
+        """Build function"""
+        tgen = get_topogen(self)
+
+        # Building topology from json file
+        build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+    """
+    Sets up the pytest environment
+
+    * `mod`: module name
+    """
+
+    testsuite_run_time = time.asctime(time.localtime(time.time()))
+    logger.info("Testsuite start time: {}".format(testsuite_run_time))
+    logger.info("=" * 40)
+
+    logger.info("Running setup_module to create topology")
+
+    # This function initiates the topology build with Topogen...
+    tgen = Topogen(BGPCOMMUNITIES, mod.__name__)
+    # ... and here it calls Mininet initialization functions.
+
+    # Starting topology, create tmp files which are loaded to routers
+    #  to start deamons and then start routers
+    start_topology(tgen)
+
+    # Creating configuration from JSON
+    build_config_from_json(tgen, topo)
+
+    # Checking BGP convergence
+    global BGP_CONVERGENCE
+    global ADDR_TYPES
+
+    # Don't run this test if we have any failure.
+    if tgen.routers_have_failure():
+        pytest.skip(tgen.errors)
+
+    # Api call verify whether BGP is converged
+    BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+    assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
+        BGP_CONVERGENCE
+    )
+
+    logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+    """
+    Teardown the pytest environment
+
+    * `mod`: module name
+    """
+
+    logger.info("Running teardown_module to delete topology")
+
+    tgen = get_topogen()
+
+    # Stop toplogy and Remove tmp files
+    tgen.stop_topology()
+
+    logger.info(
+        "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+    )
+    logger.info("=" * 40)
+
+
+#####################################################
+#
+#   Tests starting
+#
+#####################################################
+
+
+def test_bgp_no_advertise_community_p0(request):
+    """
+    Verify routes are not advertised when NO-ADVERTISE Community is applied
+
+    """
+
+    tc_name = request.node.name
+    write_test_header(tc_name)
+    tgen = get_topogen()
+    reset_config_on_routers(tgen)
+
+    # Don't run this test if we have any failure.
+    if tgen.routers_have_failure():
+        pytest.skip(tgen.errors)
+
+    NEXT_HOP_IP = {
+        "ipv4": topo["routers"]["r0"]["links"]["r1"]["ipv4"].split("/")[0],
+        "ipv6": topo["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0],
+    }
+
+    # configure static routes
+    dut = "r3"
+    protocol = "bgp"
+
+    for addr_type in ADDR_TYPES:
+        # Enable static routes
+        input_dict = {
+            "r1": {
+                "static_routes": [
+                    {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
+                ]
+            }
+        }
+
+        logger.info("Configure static routes")
+        result = create_static_routes(tgen, input_dict)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step("configure redistribute static and connected in Router BGP " "in R1")
+
+        input_dict_2 = {
+            "r1": {
+                "bgp": {
+                    "address_family": {
+                        addr_type: {
+                            "unicast": {
+                                "redistribute": [
+                                    {"redist_type": "static"},
+                                    {"redist_type": "connected"},
+                                ]
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_dict_2)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step(
+            "BGP neighbors are up, static and connected route advertised from"
+            " R1 are present on R2 BGP table and RIB using show ip bgp and "
+            " show ip route"
+        )
+        step(
+            "Static and connected route advertised from R1 are present on R3"
+            " BGP table and RIB using show ip bgp and show ip route"
+        )
+
+        dut = "r3"
+        protocol = "bgp"
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step("Configure prefix list P1 on R2 to permit route coming from R1")
+        # Create ip prefix list
+        input_dict_2 = {
+            "r2": {
+                "prefix_lists": {
+                    addr_type: {
+                        "pf_list_1_{}".format(addr_type): [
+                            {"seqid": 10, "network": "any", "action": "permit"}
+                        ]
+                    }
+                }
+            }
+        }
+        result = create_prefix_lists(tgen, input_dict_2)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        # Create route map
+        input_dict_3 = {
+            "r2": {
+                "route_maps": {
+                    "rmap_match_pf_1_{}".format(addr_type): [
+                        {
+                            "action": "permit",
+                            "seq_id": "5",
+                            "match": {
+                                addr_type: {"prefix_lists": "pf_list_1_" + addr_type}
+                            },
+                            "set": {"community": {"num": "no-advertise"}},
+                        }
+                    ]
+                }
+            }
+        }
+        result = create_route_maps(tgen, input_dict_3)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+        step(
+            "Apply route-map RM1 on R2, R2 to R3 BGP neighbor with no"
+            " advertise community"
+        )
+        # Configure neighbor for route map
+        input_dict_4 = {
+            "r2": {
+                "bgp": {
+                    "address_family": {
+                        addr_type: {
+                            "unicast": {
+                                "neighbor": {
+                                    "r1": {
+                                        "dest_link": {
+                                            "r2": {
+                                                "route_maps": [
+                                                    {
+                                                        "name": "rmap_match_pf_1_"
+                                                        + addr_type,
+                                                        "direction": "in",
+                                                    }
+                                                ]
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_dict_4)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step(
+            "After advertising no advertise community to BGP neighbor "
+            "static and connected router got removed from R3 verify using "
+            "show ip bgp & show ip route"
+        )
+
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
+        assert result is not True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+        result = verify_rib(
+            tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
+        )
+        assert result is not True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+        step("Remove and Add no advertise community")
+        # Configure neighbor for route map
+        input_dict_4 = {
+            "r2": {
+                "bgp": {
+                    "address_family": {
+                        addr_type: {
+                            "unicast": {
+                                "neighbor": {
+                                    "r1": {
+                                        "dest_link": {
+                                            "r2": {
+                                                "route_maps": [
+                                                    {
+                                                        "name": "rmap_match_pf_1_"
+                                                        + addr_type,
+                                                        "direction": "in",
+                                                        "delete": True,
+                                                    }
+                                                ]
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_dict_4)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step(
+            "After removing no advertise community from BGP neighbor "
+            "static and connected router got advertised to R3 and "
+            "removing route-map, verify route using show ip bgp"
+            " and show ip route"
+        )
+
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+        assert result is True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+        result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+        assert result is True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+    step("Repeat above steps when IBGP nbr configured between R1, R2 & R2, R3")
+    topo1 = deepcopy(topo)
+
+    topo1["routers"]["r1"]["bgp"]["local_as"] = "100"
+    topo1["routers"]["r2"]["bgp"]["local_as"] = "100"
+    topo1["routers"]["r3"]["bgp"]["local_as"] = "100"
+
+    for rtr in ["r1", "r2", "r3"]:
+        if "bgp" in topo1["routers"][rtr].keys():
+            delete_bgp = {rtr: {"bgp": {"delete": True}}}
+            result = create_router_bgp(tgen, topo1, delete_bgp)
+            assert result is True, "Testcase {} : Failed \n Error: {}".format(
+                tc_name, result
+            )
+            config_bgp = {
+                rtr: {"bgp": {"local_as": topo1["routers"][rtr]["bgp"]["local_as"]}}
+            }
+            result = create_router_bgp(tgen, topo1, config_bgp)
+            assert result is True, "Testcase {} : Failed \n Error: {}".format(
+                tc_name, result
+            )
+
+    build_config_from_json(tgen, topo1, save_bkup=False)
+
+    step("verify bgp convergence before starting test case")
+
+    bgp_convergence = verify_bgp_convergence(tgen, topo1)
+    assert bgp_convergence is True, "Testcase {} : Failed \n Error: {}".format(
+        tc_name, bgp_convergence
+    )
+
+    # configure static routes
+    dut = "r3"
+    protocol = "bgp"
+
+    for addr_type in ADDR_TYPES:
+        # Enable static routes
+        input_dict = {
+            "r1": {
+                "static_routes": [
+                    {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
+                ]
+            }
+        }
+
+        logger.info("Configure static routes")
+        result = create_static_routes(tgen, input_dict)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step("configure redistribute static and connected in Router " "BGP in R1")
+
+        input_dict_2 = {
+            "r1": {
+                "bgp": {
+                    "address_family": {
+                        addr_type: {
+                            "unicast": {
+                                "redistribute": [
+                                    {"redist_type": "static"},
+                                    {"redist_type": "connected"},
+                                ]
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_dict_2)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step(
+            "BGP neighbors are up, static and connected route advertised from"
+            " R1 are present on R2 BGP table and RIB using show ip bgp and "
+            " show ip route"
+        )
+        step(
+            "Static and connected route advertised from R1 are present on R3"
+            " BGP table and RIB using show ip bgp and show ip route"
+        )
+
+        dut = "r2"
+        protocol = "bgp"
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step("Configure prefix list P1 on R2 to permit route coming from R1")
+        # Create ip prefix list
+        input_dict_2 = {
+            "r2": {
+                "prefix_lists": {
+                    addr_type: {
+                        "pf_list_1_{}".format(addr_type): [
+                            {"seqid": 10, "network": "any", "action": "permit"}
+                        ]
+                    }
+                }
+            }
+        }
+        result = create_prefix_lists(tgen, input_dict_2)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        # Create route map
+        input_dict_3 = {
+            "r2": {
+                "route_maps": {
+                    "rmap_match_pf_1_{}".format(addr_type): [
+                        {
+                            "action": "permit",
+                            "seq_id": "5",
+                            "match": {
+                                addr_type: {"prefix_lists": "pf_list_1_" + addr_type}
+                            },
+                            "set": {"community": {"num": "no-advertise"}},
+                        }
+                    ]
+                }
+            }
+        }
+        result = create_route_maps(tgen, input_dict_3)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+        step(
+            "Apply route-map RM1 on R2, R2 to R3 BGP neighbor with no"
+            " advertise community"
+        )
+
+        # Configure neighbor for route map
+        input_dict_4 = {
+            "r2": {
+                "bgp": {
+                    "address_family": {
+                        addr_type: {
+                            "unicast": {
+                                "neighbor": {
+                                    "r1": {
+                                        "dest_link": {
+                                            "r2": {
+                                                "route_maps": [
+                                                    {
+                                                        "name": "rmap_match_pf_1_"
+                                                        + addr_type,
+                                                        "direction": "in",
+                                                    }
+                                                ]
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_dict_4)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step(
+            "After advertising no advertise community to BGP neighbor "
+            "static and connected router got removed from R3 verify using "
+            "show ip bgp & show ip route"
+        )
+
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+        assert result is True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+        result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+        assert result is True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+        step("Remove and Add no advertise community")
+        # Configure neighbor for route map
+        input_dict_4 = {
+            "r2": {
+                "bgp": {
+                    "address_family": {
+                        addr_type: {
+                            "unicast": {
+                                "neighbor": {
+                                    "r1": {
+                                        "dest_link": {
+                                            "r2": {
+                                                "route_maps": [
+                                                    {
+                                                        "name": "rmap_match_pf_1_"
+                                                        + addr_type,
+                                                        "direction": "in",
+                                                        "delete": True,
+                                                    }
+                                                ]
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_dict_4)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+
+        step(
+            "After removing no advertise community from BGP neighbor "
+            "static and connected router got advertised to R3 and "
+            "removing route verify using show ip bgp and "
+            " show ip route"
+        )
+
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+        assert result is True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+        result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+        assert result is True, "Testcase {} : Failed \n "
+        " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+    write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+    args = ["-s"] + sys.argv[1:]
+    sys.exit(pytest.main(args))
index 26181f09745123422efbf0cfe4bd4fa1647ff1ba..b8dd7cda58a19e1ec872e128c7892b03b8d74a29 100644 (file)
@@ -27,14 +27,17 @@ from lib import topotest
 from lib.topolog import logger
 
 # Import common_config to use commomnly used APIs
-from lib.common_config import (create_common_configuration,
-                               InvalidCLIError,
-                               load_config_to_router,
-                               check_address_types,
-                               generate_ips,
-                               validate_ip_address,
-                               find_interface_with_greater_ip,
-                               run_frr_cmd, retry)
+from lib.common_config import (
+    create_common_configuration,
+    InvalidCLIError,
+    load_config_to_router,
+    check_address_types,
+    generate_ips,
+    validate_ip_address,
+    find_interface_with_greater_ip,
+    run_frr_cmd,
+    retry,
+)
 
 BGP_CONVERGENCE_TIMEOUT = 10
 
@@ -487,13 +490,11 @@ def __create_bgp_unicast_address_family(
 
             if "allowas_in" in peer:
                 allow_as_in = peer["allowas_in"]
-                config_data.append("{} allowas-in {}".format(neigh_cxt,
-                                                             allow_as_in))
+                config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
 
             if "no_allowas_in" in peer:
                 allow_as_in = peer["no_allowas_in"]
-                config_data.append("no {} allowas-in {}".format(neigh_cxt,
-                                                                allow_as_in))
+                config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
             if prefix_lists:
                 for prefix_list in prefix_lists:
                     name = prefix_list.setdefault("name", {})
@@ -529,12 +530,10 @@ def __create_bgp_unicast_address_family(
                         config_data.append(cmd)
 
             if allowas_in:
-                number_occurences = allowas_in.\
-                    setdefault("number_occurences", {})
+                number_occurences = allowas_in.setdefault("number_occurences", {})
                 del_action = allowas_in.setdefault("delete", False)
 
-                cmd = "{} allowas-in {}".format(neigh_cxt,
-                                                number_occurences)
+                cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
 
                 if del_action:
                     cmd = "no {}".format(cmd)
@@ -640,6 +639,9 @@ def verify_bgp_convergence(tgen, topo):
 
     logger.debug("Entering lib API: verify_bgp_convergence()")
     for router, rnode in tgen.routers().iteritems():
+        if "bgp" not in topo["routers"][router]:
+            continue
+
         logger.info("Verifying BGP Convergence on router %s", router)
         show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
         # Verifying output dictionary show_bgp_json is empty or not
@@ -1031,7 +1033,7 @@ def clear_bgp_and_verify(tgen, topo, router):
             " router {}".format(router)
         )
         return errormsg
-    logger.info(peer_uptime_after_clear_bgp)
+
     # Comparing peerUptimeEstablishedEpoch dictionaries
     if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
         logger.info("BGP neighborship is reset after clear BGP on router %s", router)
@@ -1750,9 +1752,9 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
 
             # Static routes
             sleep(2)
-            logger.info('Checking router {} BGP RIB:'.format(dut))
+            logger.info("Checking router {} BGP RIB:".format(dut))
 
-            if 'static_routes' in input_dict[routerInput]:
+            if "static_routes" in input_dict[routerInput]:
                 static_routes = input_dict[routerInput]["static_routes"]
 
                 for static_route in static_routes:
@@ -1762,12 +1764,10 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
                     nh_found = False
                     vrf = static_route.setdefault("vrf", None)
                     if vrf:
-                        cmd = "{} vrf {} {}".\
-                            format(command, vrf, addr_type)
+                        cmd = "{} vrf {} {}".format(command, vrf, addr_type)
 
                     else:
-                        cmd = "{} {}".\
-                            format(command, addr_type)
+                        cmd = "{} {}".format(command, addr_type)
 
                     cmd = "{} json".format(cmd)
 
@@ -1775,8 +1775,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
 
                     # Verifying output dictionary rib_routes_json is not empty
                     if bool(rib_routes_json) == False:
-                        errormsg = "No route found in rib of router {}..". \
-                            format(router)
+                        errormsg = "No route found in rib of router {}..".format(router)
                         return errormsg
 
                     network = static_route["network"]
@@ -1804,64 +1803,84 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
                                 if not isinstance(next_hop, list):
                                     next_hop = [next_hop]
                                     list1 = next_hop
-                                found_hops = [rib_r["ip"] for rib_r in
-                                              rib_routes_json["routes"][
-                                                  st_rt][0]["nexthops"]]
+                                found_hops = [
+                                    rib_r["ip"]
+                                    for rib_r in rib_routes_json["routes"][st_rt][0][
+                                        "nexthops"
+                                    ]
+                                ]
                                 list2 = found_hops
-                                missing_list_of_nexthops = \
-                                    set(list2).difference(list1)
-                                additional_nexthops_in_required_nhs = \
-                                    set(list1).difference(list2)
+                                missing_list_of_nexthops = set(list2).difference(list1)
+                                additional_nexthops_in_required_nhs = set(
+                                    list1
+                                ).difference(list2)
 
                                 if list2:
                                     if additional_nexthops_in_required_nhs:
-                                        logger.info("Missing nexthop %s for route"\
-                                        " %s in RIB of router %s\n", \
-                                        additional_nexthops_in_required_nhs,  \
-                                        st_rt, dut)
-                                        errormsg=("Nexthop {} is Missing for "\
-                                        "route {} in RIB of router {}\n".format(
+                                        logger.info(
+                                            "Missing nexthop %s for route"
+                                            " %s in RIB of router %s\n",
                                             additional_nexthops_in_required_nhs,
-                                            st_rt, dut))
+                                            st_rt,
+                                            dut,
+                                        )
+                                        errormsg = (
+                                            "Nexthop {} is Missing for "
+                                            "route {} in RIB of router {}\n".format(
+                                                additional_nexthops_in_required_nhs,
+                                                st_rt,
+                                                dut,
+                                            )
+                                        )
                                         return errormsg
                                     else:
                                         nh_found = True
                             if aspath:
-                                found_paths = rib_routes_json["routes"][
-                                    st_rt][0]["path"]
+                                found_paths = rib_routes_json["routes"][st_rt][0][
+                                    "path"
+                                ]
                                 if aspath == found_paths:
                                     aspath_found = True
-                                    logger.info("Found AS path {} for route" \
-                                        " {} in RIB of router "\
-                                            "{}\n".format(aspath, st_rt, dut))
+                                    logger.info(
+                                        "Found AS path {} for route"
+                                        " {} in RIB of router "
+                                        "{}\n".format(aspath, st_rt, dut)
+                                    )
                                 else:
-                                    errormsg=("AS Path {} is missing for route"\
-                                        "for route {} in RIB of router {}\n"\
-                                            .format(aspath, st_rt, dut))
+                                    errormsg = (
+                                        "AS Path {} is missing for route"
+                                        "for route {} in RIB of router {}\n".format(
+                                            aspath, st_rt, dut
+                                        )
+                                    )
                                     return errormsg
 
                         else:
                             missing_routes.append(st_rt)
 
                     if nh_found:
-                        logger.info("Found next_hop {} for all bgp"
-                                    " routes in RIB of"
-                                    " router {}\n".format(next_hop, \
-                                                          router))
+                        logger.info(
+                            "Found next_hop {} for all bgp"
+                            " routes in RIB of"
+                            " router {}\n".format(next_hop, router)
+                        )
 
                     if len(missing_routes) > 0:
-                        errormsg = ("Missing route in RIB of router {}, "
-                                    "routes: {}\n".format(dut, missing_routes))
+                        errormsg = (
+                            "Missing route in RIB of router {}, "
+                            "routes: {}\n".format(dut, missing_routes)
+                        )
                         return errormsg
 
                     if found_routes:
-                        logger.info("Verified routes in router {} BGP RIB, "
-                                    "found routes are: {} \n".\
-                                        format(dut, found_routes))
+                        logger.info(
+                            "Verified routes in router {} BGP RIB, "
+                            "found routes are: {} \n".format(dut, found_routes)
+                        )
                 continue
 
             if "bgp" not in input_dict[routerInput]:
-               continue
+                continue
 
             # Advertise networks
             bgp_data_list = input_dict[routerInput]["bgp"]
@@ -1872,36 +1891,33 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
             for bgp_data in bgp_data_list:
                 vrf_id = bgp_data.setdefault("vrf", None)
                 if vrf_id:
-                    cmd = "{} vrf {} {}".\
-                        format(command, vrf_id, addr_type)
+                    cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
                 else:
-                    cmd = "{} {}".\
-                        format(command, addr_type)
+                    cmd = "{} {}".format(command, addr_type)
 
                 cmd = "{} json".format(cmd)
 
-                rib_routes_json =  run_frr_cmd(rnode, cmd, isjson=True)
+                rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
 
                 # Verifying output dictionary rib_routes_json is not empty
                 if bool(rib_routes_json) == False:
-                    errormsg = "No route found in rib of router {}..". \
-                        format(router)
+                    errormsg = "No route found in rib of router {}..".format(router)
                     return errormsg
 
                 bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
-                advertise_network = \
-                    bgp_net_advertise.setdefault("advertise_networks", [])
+                advertise_network = bgp_net_advertise.setdefault(
+                    "advertise_networks", []
+                )
 
                 for advertise_network_dict in advertise_network:
                     found_routes = []
                     missing_routes = []
                     found = False
 
-                    network = advertise_network_dict['network']
+                    network = advertise_network_dict["network"]
 
-                    if 'no_of_network' in advertise_network_dict:
-                        no_of_network = advertise_network_dict[
-                            'no_of_network']
+                    if "no_of_network" in advertise_network_dict:
+                        no_of_network = advertise_network_dict["no_of_network"]
                     else:
                         no_of_network = 1
 
@@ -1923,13 +1939,17 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
                             missing_routes.append(st_rt)
 
                     if len(missing_routes) > 0:
-                        errormsg = ("Missing route in BGP RIB of router {},"
-                                    " are: {}\n".format(dut, missing_routes))
+                        errormsg = (
+                            "Missing route in BGP RIB of router {},"
+                            " are: {}\n".format(dut, missing_routes)
+                        )
                         return errormsg
 
                     if found_routes:
-                        logger.info("Verified routes in router {} BGP RIB, found "
-                                    "routes are: {}\n".format(dut, found_routes))
+                        logger.info(
+                            "Verified routes in router {} BGP RIB, found "
+                            "routes are: {}\n".format(dut, found_routes)
+                        )
 
     logger.debug("Exiting lib API: verify_bgp_rib()")
     return True
index 5ee59070cc4ad7f0d2faed40f60a8b5ffd876c20..2483930d18133d0b65be42874c8912fc85f2119d 100644 (file)
@@ -1198,7 +1198,7 @@ def create_route_maps(tgen, input_dict, build=False):
                         ipv6_data = set_data.setdefault("ipv6", {})
                         local_preference = set_data.setdefault("locPrf", None)
                         metric = set_data.setdefault("metric", None)
-                        as_path = set_data.setdefault("path", {})
+                        as_path = set_data.setdefault("aspath", {})
                         weight = set_data.setdefault("weight", None)
                         community = set_data.setdefault("community", {})
                         large_community = set_data.setdefault("large_community", {})