diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json | 115 | ||||
| -rwxr-xr-x | tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py | 3495 | ||||
| -rw-r--r-- | tests/topotests/lib/bgp.py | 341 | ||||
| -rw-r--r-- | tests/topotests/lib/common_config.py | 129 |
4 files changed, 3846 insertions, 234 deletions
diff --git a/tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json b/tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json new file mode 100644 index 0000000000..7b3cac814f --- /dev/null +++ b/tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json @@ -0,0 +1,115 @@ +{ + "ipv4base":"192.168.0.0", + "ipv4mask":24, + "ipv6base":"fd00::", + "ipv6mask":64, + "link_ip_start":{"ipv4":"192.168.0.0", "v4mask":24, "ipv6":"fd00::", "v6mask":64}, + "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128}, + "routers":{ + "r1":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2-link1": {"ipv4":"auto", "ipv6":"auto"}, + "r2-link2": {"ipv4":"auto", "ipv6":"auto"} + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"} + ], + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": { + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"} + ], + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": { + } + } + } + } + } + } + } + }, + "static_routes":[ + { + "network":"100.0.10.1/32", + "no_of_ip":5, + "next_hop":"192.168.1.2" + }, + { + "network":"1::1/128", + "no_of_ip":5, + "next_hop":"fd00:0:0:1::2" + }]}, + "r2":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1-link1": {"ipv4":"auto", "ipv6":"auto"}, + "r1-link2": {"ipv4":"auto", "ipv6":"auto"} + }, + "bgp": { + "local_as": "200", + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"} + ], + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": { + } + } + } + } + } + }, + "ipv6": { + "unicast": { + "redistribute": [ + {"redist_type": "static"} + ], + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": { + } + } + } + } + } + } + } + }, + "static_routes":[ + { + "network":"200.0.20.1/32", + "no_of_ip":5, + "next_hop":"192.168.1.1" + }, + { + "network":"2::1/128", + "no_of_ip":5, + "next_hop":"fd00:0:0:1::1" + }] + } + } +} diff --git a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py new file mode 100755 index 0000000000..fdc1bed522 --- /dev/null +++ b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py @@ -0,0 +1,3495 @@ +#!/usr/bin/env python +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF") +# in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test BGP Gracefull Restart functionality. +Basic Common Test steps for all the test case below : +- Create topology (setup module) + Creating 2 routers topology, r1, r2 in IBGP +- Bring up topology +- Verify for bgp to converge +- Configure BGP Garceful Restart on both the routers. + +1. Helper BGP router R1, mark and unmark IPV4 routes + as stale as the restarting router R2 come up within the restart time. +2. Helper BGP router R1, mark IPV4 routes as stale and + deletes them as the restarting router R2 did-not come up within restart + time. +3. Restart BGP router R1, detects it is connected to R2, + which is a helper router. Verify the restart capability i.e. R bit + are sent after R1 reloads and comes back. +4. Verify that the restarting node sets "R" bit while sending the + BGP open messages after the node restart, only if GR is enabled. +5. Verify if restarting node resets R bit in BGP open message + during normal BGP session flaps as well, even when GR restarting mode is enabled. + Here link flap happen due to interface UP/DOWN. +6. Verify if restarting node resets R bit in BGP open message + during normal BGP session flaps as well, even when GR restarting mode is enabled. + Here link flap happen due to neigh router restarts +7. Verify if restarting node resets R bit in BGP open message + during normal BGP session flaps when GR helper mode is enabled. + Here link flap happen due to interface UP/DOWN. +8. Verify if restarting node resets R bit in BGP open message + during normal BGP session flaps when GR helper mode is enabled. + Here link flap happen due to neigh router restarts. +9. Verify that restarting nodes set "F" bit while sending + the BGP open messages after it restarts, only when BGP GR is enabled. +10. Verify that restarting nodes reset "F" bit while sending + the BGP open messages after it's restarts, when BGP GR is **NOT** enabled. +11. Verify that only GR helper routers keep the stale + route entries, not any GR disabled router. +12. Verify that GR helper routers keeps all the routes received + from restarting node if both the routers are configured as GR restarting node. +13. Verify that GR helper routers delete all the routes + received from a node if both the routers are configured as GR helper node. +14. Test Objective : After BGP neighborship is established and GR capability + is exchanged, transition helper router to disabled state. +15.Test Objective : After BGP neighborship is established and GR capability + is exchanged, transition disabled router to helper state. +16. Verify transition from Global Restarting to Disable and then + Global Disable to Restarting. +17. Verify transition from Global Helper to Disable and then Global + Disable to Helper. +18. Verify transition from Global Restart to Helper and then Global + Helper to Restart. +19. Verify transition from Peer-level helper to Global Restarting. +20. Verify transition from Peer-level restart to Global Restart. +21. Verify transition from Peer-level disabled to Global Restart. +22. Verify Peer-level inherit from Global Restarting mode. +23. Verify transition from Peer-level helper to Global inherit helper. +24. Verify transition from Peer-level restart to Global inherit helper. +25. Verify transition from Peer-level disbale to Global inherit helper. +26. Verify default GR functional mode is Helper. +27. Verify transition from Peer-level Helper to Global Disable. +28. Verify transition from Peer-level Restarting to Global Disable. +29. Verify transition from Peer-level Disable to Global Disable. +30. Verfiy Peer-level inherit from Global Disable mode. + +""" + +import os +import sys +import json +import time +import inspect +import pytest +from time import sleep + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join("../")) +sys.path.append(os.path.join("../lib/")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib import topotest +from lib.topogen import Topogen, TopoRouter, get_topogen +from lib.topolog import logger + +# Required to instantiate the topology builder class. +from mininet.topo import Topo + +# Import topoJson from lib, to create topology and initial configuration +from lib.topojson import build_topo_from_json, build_config_from_json +from lib.bgp import ( + clear_bgp, + verify_bgp_rib, + verify_graceful_restart, + create_router_bgp, + verify_r_bit, + verify_f_bit, + verify_bgp_convergence, + verify_graceful_restart_timers, +) + +from lib.common_config import ( + write_test_header, + reset_config_on_routers, + start_topology, + kill_router_daemons, + start_router_daemons, + verify_rib, + check_address_types, + write_test_footer, + check_router_status, + shutdown_bringup_interface, + step, + kill_mininet_routers_process, + get_frr_ipv6_linklocal, + create_route_maps, +) + +# Reading the data from JSON File for topology and configuration creation +jsonFile = "{}/bgp_gr_topojson_topo1.json".format(CWD) +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + logger.info("Could not read file:", jsonFile) + + +# Global variables +NEXT_HOP_IP = {"ipv4": "192.168.1.10", "ipv6": "fd00:0:0:1::10"} +NEXT_HOP_IP_1 = {"ipv4": "192.168.0.1", "ipv6": "fd00::1"} +NEXT_HOP_IP_2 = {"ipv4": "192.168.0.2", "ipv6": "fd00::2"} +BGP_CONVERGENCE = False +GR_RESTART_TIMER = 20 +PREFERRED_NEXT_HOP = "link_local" + + +class GenerateTopo(Topo): + """ + Test topology builder + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # This function only purpose is to create topology + # as defined in input json file. + # + # Create topology (setup module) + # Creating 2 routers topology, r1, r2in IBGP + # Bring up topology + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + global ADDR_TYPES + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(GenerateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Kill stale mininet routers and process + kill_mininet_routers_process(tgen) + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + ADDR_TYPES = check_address_types() + + for addr_type in ADDR_TYPES: + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error:" " {}".format( + BGP_CONVERGENCE + ) + + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) + logger.info("=" * 40) + + +def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut, peer): + """ + This function groups the repetitive function calls into one function. + """ + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + clear_bgp(tgen, addr_type, dut) + + return True + + +def next_hop_per_address_family( + tgen, dut, peer, addr_type, next_hop_dict, preferred_next_hop=PREFERRED_NEXT_HOP +): + """ + This function returns link_local or global next_hop per address-family + """ + + intferface = topo["routers"][peer]["links"]["{}-link1".format(dut)]["interface"] + if addr_type == "ipv6" and "link_local" in preferred_next_hop: + next_hop = get_frr_ipv6_linklocal(tgen, peer, intf=intferface) + else: + next_hop = next_hop_dict[addr_type] + + return next_hop + + +def test_BGP_GR_TC_46_p1(request): + """ + Test Objective : transition from Peer-level helper to Global Restarting + Global Mode : GR Restarting + PerPeer Mode : GR Helper + GR Mode effective : GR Helper + + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + step( + "Configure R1 and R2 as GR restarting node in global" + " and helper in per-Peer-level" + ) + + input_dict = { + "r1": { + "bgp": { + "graceful-restart": {"graceful-restart": True,}, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + }, + } + }, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + step("Verify on R2 that R1 advertises GR capabilities as a restarting node") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGP on R2") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + step( + "Verify that R1 keeps the stale entries in RIB & FIB and R2 keeps stale entries in FIB using" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step( + "Bring up BGP on R1 and remove Peer-level GR config" + " from R1 following by a session reset" + ) + + start_router_daemons(tgen, "r2", ["bgpd"]) + + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": False} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": False} + } + } + } + } + }, + } + } + } + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + step("Verify on R2 that R1 advertises GR capabilities as a restarting node") + + input_dict = { + "r1": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGP on R1") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + step( + "Verify that R1 keeps the stale entries in FIB command and R2 keeps stale entries in RIB & FIB" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert ( + result is True + ), "Testcase {} : Failed \n Routes are still present \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_bgp_rib(tgen, addr_type, "r2", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert ( + result is True + ), "Testcase {} : Failed \n Routes are still present \n Error {}".format( + tc_name, result + ) + + step("Start BGP on R1") + + start_router_daemons(tgen, "r1", ["bgpd"]) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_50_p1(request): + """ + Test Objective : Transition from Peer-level helper to Global inherit helper + Global Mode : None + PerPeer Mode : Helper + GR Mode effective : GR Helper + + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + step( + "Configure R1 as GR helper node at per Peer-level for R2" + " and configure R2 as global restarting node." + ) + + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + step("Verify on R2 that R1 advertises GR capabilities as a helper node") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGP on R2") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + step( + "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Bring up BGP on R2 and remove Peer-level GR config from R1 ") + + start_router_daemons(tgen, "r2", ["bgpd"]) + + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": False} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": False} + } + } + } + } + }, + } + } + } + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + step("Verify on R2 that R1 still advertises GR capabilities as a helper node") + + input_dict = { + "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}}, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGP on R2") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + step( + "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert ( + result is True + ), "Testcase {} : Failed \n Routes are still present \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert ( + result is True + ), "Testcase {} : Failed \n Routes are still present \n Error {}".format( + tc_name, result + ) + + step("Start BGP on R2") + + start_router_daemons(tgen, "r2", ["bgpd"]) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_51_p1(request): + """ + Test Objective : Transition from Peer-level restarting to Global inherit helper + Global Mode : None + PerPeer Mode : GR Restart + GR Mode effective : GR Restart + + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + step("Configure R1 as GR restarting node at per Peer-level for R2") + + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + step("Verify on R2 that R1 advertises GR capabilities as a restarting node") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGP on R1") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + step( + "Verify that R1 keeps the stale entries in FIB & R2 keeps stale entries in RIB & FIB" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_bgp_rib(tgen, addr_type, "r2", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Bring up BGP on R1 and remove Peer-level GR config") + + start_router_daemons(tgen, "r1", ["bgpd"]) + + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": False} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": False} + } + } + } + } + }, + } + } + } + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + step("Verify on R2 that R1 advertises GR capabilities as a helper node") + + input_dict = { + "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}}, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGPd on R2") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + step( + "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert ( + result is True + ), "Testcase {} : Failed \n Routes are still present \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert ( + result is True + ), "Testcase {} : Failed \n Routes are still present \n Error {}".format( + tc_name, result + ) + + step("Start BGP on R2") + + start_router_daemons(tgen, "r2", ["bgpd"]) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_53_p1(request): + """ + Test Objective : Default GR functional mode is Helper. + Global Mode : None + PerPeer Mode : None + GR Mode effective : GR Helper + + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + step("configure R2 as global restarting node") + + input_dict = {"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}} + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + step( + "Verify on R2 that R1 advertises GR capabilities as a helper node based on inherit" + ) + + input_dict = { + "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}}, + "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}, + } + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Kill BGPd on R2") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + step( + "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB" + ) + + for addr_type in ADDR_TYPES: + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1 + ) + input_topo = {"r1": topo["routers"]["r1"]} + result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + for addr_type in ADDR_TYPES: + next_hop = next_hop_per_address_family( + tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2 + ) + input_topo = {"r2": topo["routers"]["r2"]} + result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + step("Start BGP on R2") + + start_router_daemons(tgen, "r2", ["bgpd"]) + + write_test_footer(tc_name) + + +def test_BGP_GR_UTP_1_3_p0(request): + """ + Test Objective : Helper BGP router R1, mark and unmark IPV4 routes + as stale as the restarting router R2 come up within the restart time + + Test Objective : Helper BGP router R1, mark IPV4 routes as stale and + deletes them as the restarting router R2 did-not come up within + restart time. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Create route-map to prefer global next-hop + input_dict = { + "r1": { + "route_maps": { + "rmap_global": [ + {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}} + ] + } + }, + "r2": { + "route_maps": { + "rmap_global": [ + {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}} + ] + } + }, + } + result = create_route_maps(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + # Configure neighbor for route map + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": { + "route_maps": [ + { + "name": "rmap_global", + "direction": "in", + } + ] + } + } + } + } + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": { + "route_maps": [ + { + "name": "rmap_global", + "direction": "in", + } + ] + } + } + } + } + } + } + } + } + }, + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "graceful-restart": {"timer": {"restart-time": GR_RESTART_TIMER}}, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + }, + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r2", peer="r1") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r2", peer="r1" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global" + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, "bgp") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R2 goes for reload ") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info( + "[Phase 3] : R2 is still down, restart time 120 sec." + " So time verify the routes are present in BGP RIB" + " and ZEBRA" + ) + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global" + ) + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 4] : sleep for {} sec".format(GR_RESTART_TIMER)) + sleep(GR_RESTART_TIMER) + + logger.info("[Phase 5] : Verify the routes from r2 ") + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = NEXT_HOP_IP_2[addr_type] + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, expected=False) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + # Verifying RIB routes + result = verify_rib( + tgen, addr_type, dut, input_topo, next_hop, "bgp", expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + logger.info("[Phase 5] : R2 is about to come up now ") + start_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info("[Phase 5] : R2 is UP Now ! ") + + for addr_type in ADDR_TYPES: + # Verifying GR stats + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r2", peer="r1" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global" + ) + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_UTP_15_TC_9_p1(request): + """ + Test Objective : Restart BGP router R1, detects it is connected to R2, + which is a helper router. Verify the restart capability i.e. R bit + are sent after R1 reloads and comes back. + + Test Objective : Verify that restarting nodes reset "F" bit while sending + the BGP open messages after it's restarts, when BGP GR is **NOT** enabled. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Checking router status, starting if not running + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + # reset_config_on_routers(tgen) + + # Create route-map to prefer global next-hop + input_dict = { + "r1": { + "route_maps": { + "rmap_global": [ + {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}} + ] + } + }, + "r2": { + "route_maps": { + "rmap_global": [ + {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}} + ] + } + }, + } + result = create_route_maps(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + # Configure neighbor for route map + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": { + "route_maps": [ + { + "name": "rmap_global", + "direction": "in", + } + ] + } + } + } + } + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": { + "route_maps": [ + { + "name": "rmap_global", + "direction": "in", + } + ] + } + } + } + } + } + } + } + } + }, + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + logger.info( + "[Phase 1] : Test Setup " "[Helper Mode]R1-----R2[Restart Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global" + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R1 goes for reload ") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 6] : R1 is about to come up now ") + start_router_daemons(tgen, "r1", ["bgpd"]) + + for addr_type in ADDR_TYPES: + # Verifying GR stats + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r2", peer="r1") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global" + ) + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_f_bit( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_UTP_35_p1(request): + """ + Test Objective : Restart BGP router R1 connected to R2, + which is a restart router. + R1 should not send any GR capability in the open message, + however it would process open message from R2 with GR -restart + capability, but would not perform any BGP GR functionality. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup" " [Disable Mode]R1-----R2[Restart Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R1 goes for reload ") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 3] : R1 is about to come up now ") + start_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_4_p0(request): + """ + Test Objective : Verify that the restarting node sets "R" bit while sending the + BGP open messages after the node restart, only if GR is enabled. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup" " [Restart Mode]R1-----R2[Helper Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R2 goes for reload ") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info( + "[Phase 3] : R2 is still down, restart time {} sec." + "So time verify the routes are present in BGP RIB and ZEBRA ".format( + GR_RESTART_TIMER + ) + ) + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib( + tgen, addr_type, dut, input_topo, next_hop, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + # Verifying RIB routes + result = verify_rib( + tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + logger.info("[Phase 5] : R2 is about to come up now ") + start_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_5_1_2_p1(request): + """ + Test Objective : Verify if restarting node resets R bit in BGP open message + during normal BGP session flaps as well, even when GR restarting mode is enabled. + Here link flap happen due to interface UP/DOWN. + + """ + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup" " [Restart Mode]R1-----R2[Restart Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : Now flap the link running the BGP session ") + # Shutdown interface + intf = "r2-r1-eth0" + shutdown_bringup_interface(tgen, "r2", intf) + + # Bring up Interface + shutdown_bringup_interface(tgen, dut, intf, ifaceaction=True) + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : Restart BGPd on router R2. ") + kill_router_daemons(tgen, "r2", ["bgpd"]) + + start_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_6_1_2_p1(request): + """ + Test Objective : Verify if restarting node resets R bit in BGP + open message during normal BGP session flaps when GR is disabled. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup" "[Restart Mode]R1-----R2[Helper Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 1] : Changing mode" "[Disable Mode]R1-----R2[Helper Mode]") + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + clear_bgp(tgen, addr_type, "r1") + clear_bgp(tgen, addr_type, "r2") + + # Verify GR stats + input_dict = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + } + } + }, + } + + # here the verify_graceful_restart fro the neighbor would be + # "NotReceived" as the latest GR config is not yet applied. + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : Now flap the link running the BGP session ") + # Shutdown interface + intf = "r2-r1-eth0" + shutdown_bringup_interface(tgen, "r2", intf) + + # Bring up Interface + shutdown_bringup_interface(tgen, dut, intf, ifaceaction=True) + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit( + tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("Restart BGPd on R2 ") + kill_router_daemons(tgen, "r2", ["bgpd"]) + + start_router_daemons(tgen, "r2", ["bgpd"]) + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit( + tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_8_p1(request): + """ + Test Objective : Verify that restarting nodes set "F" bit while sending + the BGP open messages after it restarts, only when BGP GR is enabled. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup" " [Restart Mode]R1-----R2[Restart Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "graceful-restart": {"preserve-fw-state": True}, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + }, + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R1 goes for reload ") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 3] : R1 is about to come up now ") + start_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r2", peer="r1") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_f_bit(tgen, topo, addr_type, input_dict, dut="r2", peer="r1") + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_17_p1(request): + """ + Test Objective : Verify that only GR helper routers keep the stale + route entries, not any GR disabled router. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info("[Phase 1] : Test Setup [Disable]R1-----R2[Restart] initialized ") + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + "preserve-fw-state": True, + }, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + }, + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R2 goes for reload ") + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info( + "[Phase 3] : R2 is still down, restart time 120 sec." + " So time verify the routes are present in BGP RIB and ZEBRA " + ) + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib( + tgen, addr_type, dut, input_topo, next_hop, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + # Verifying RIB routes + result = verify_rib( + tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + logger.info("[Phase 5] : R2 is about to come up now ") + start_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + result = verify_r_bit( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_19_p1(request): + """ + Test Objective : Verify that GR helper routers keeps all the routes received + from restarting node if both the routers are configured as GR restarting node. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info("[Phase 1] : Test Setup [Helper]R1-----R2[Restart] initialized ") + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + "preserve-fw-state": True, + }, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + }, + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info( + "[Phase 2] : R1's Gr state cahnge to Graceful" + " Restart without resetting the session " + ) + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + logger.info( + "[Phase 3] : R2 is still down, restart time 120 sec." + " So time verify the routes are present in BGP RIB and ZEBRA " + ) + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_20_p1(request): + """ + Test Objective : Verify that GR helper routers delete all the routes + received from a node if both the routers are configured as GR helper node. + """ + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info("[Phase 1] : Test Setup [Helper]R1-----R2[Helper] initialized ") + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "graceful-restart": { + "graceful-restart": True, + "preserve-fw-state": True, + }, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + }, + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + kill_router_daemons(tgen, "r2", ["bgpd"]) + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib( + tgen, addr_type, dut, input_topo, next_hop, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + # Verifying RIB routes + result = verify_rib( + tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + logger.info(" Expected behavior: {}".format(result)) + + logger.info("[Phase 5] : R2 is about to come up now ") + + start_router_daemons(tgen, "r2", ["bgpd"]) + + logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_31_1_p1(request): + """ + After BGP neighborship is established and GR capability is exchanged, + transition restarting router to disabled state and vice versa. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup" " [Helper Mode]R2-----R1[Restart Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + "r1": { + "bgp": { + "graceful-restart": {"preserve-fw-state": True}, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + }, + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + protocol = "bgp" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R1 Goes from Restart to Disable Mode ") + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + clear_bgp(tgen, addr_type, "r1") + clear_bgp(tgen, addr_type, "r2") + + # Verify GR stats + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + } + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R1 goes for reload ") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info( + "[Phase 3] : R1 is still down, restart time 120 sec." + " So time verify the routes are not present in BGP RIB and ZEBRA" + ) + + for addr_type in ADDR_TYPES: + # Verifying RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_rib( + tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False + ) + assert result is not True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 4] : R1 is about to come up now ") + start_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 5] : R1 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +def test_BGP_GR_TC_31_2_p1(request): + """ + After BGP neighborship is established and GR capability is exchanged, + transition restarting router to disabled state and vice versa. + """ + + tgen = get_topogen() + tc_name = request.node.name + write_test_header(tc_name) + + # Check router status + check_router_status(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + logger.info( + "[Phase 1] : Test Setup " "[Disable Mode]R1-----R2[Restart Mode] initialized " + ) + + # Configure graceful-restart + input_dict = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart-disable": True} + } + } + } + } + }, + } + } + }, + } + + configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + dut = "r1" + peer = "r2" + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + protocol = "bgp" + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 2] : R2 Goes from Disable to Restart Mode ") + + # Configure graceful-restart + input_dict = { + "r1": { + "bgp": { + "graceful-restart": {"preserve-fw-state": True}, + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + }, + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + for addr_type in ADDR_TYPES: + clear_bgp(tgen, addr_type, "r1") + clear_bgp(tgen, addr_type, "r2") + + # Verify GR stats + input_dict = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {"graceful-restart-helper": True} + } + } + } + } + }, + } + } + }, + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {"graceful-restart": True} + } + } + } + } + }, + } + } + }, + } + + # here the verify_graceful_restart fro the neighbor would be + # "NotReceived" as the latest GR config is not yet applied. + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 3] : R1 goes for reload ") + + kill_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info( + "[Phase 4] : R1 is still down, restart time 120 sec." + " So time verify the routes are present in BGP RIB and ZEBRA " + ) + + for addr_type in ADDR_TYPES: + # Verifying RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + logger.info("[Phase 6] : R1 is about to come up now ") + start_router_daemons(tgen, "r1", ["bgpd"]) + + logger.info("[Phase 4] : R1 is UP now, so time to collect GR stats ") + + for addr_type in ADDR_TYPES: + result = verify_graceful_restart( + tgen, topo, addr_type, input_dict, dut="r1", peer="r2" + ) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying BGP RIB routes + next_hop = next_hop_per_address_family( + tgen, dut, peer, addr_type, NEXT_HOP_IP_2 + ) + input_topo = {key: topo["routers"][key] for key in ["r2"]} + result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + # Verifying RIB routes + result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol) + assert result is True, "Testcase {} : Failed \n Error {}".format( + tc_name, result + ) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 5fb07b7377..2dd90e9a86 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -23,6 +23,7 @@ from time import sleep import traceback import ipaddr import os +import sys from lib import topotest from lib.topolog import logger @@ -35,13 +36,15 @@ from lib.common_config import ( generate_ips, validate_ip_address, find_interface_with_greater_ip, - run_frr_cmd, FRRCFG_FILE, + run_frr_cmd, + FRRCFG_FILE, retry, ) LOGDIR = "/tmp/topotests/" TMPDIR = None + def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True): """ API to configure bgp on router @@ -64,9 +67,9 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True "graceful-restart": True, "preserve-fw-state": True, "timer": { - "restart-time": 300, - "rib-stale-time": 300, - "select-defer-time": 300, + "restart-time": 30, + "rib-stale-time": 30, + "select-defer-time": 30, } }, "address_family": { @@ -93,21 +96,21 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True "dest_link": { "r4": { "allowas-in": { - "number_occurences":2 + "number_occurences": 2 }, - "graceful-restart": True", "prefix_lists": [ { "name": "pf_list_1", "direction": "in" } ], - "route_maps": [ - {"name": "RMAP_MED_R3", - "direction": "in"} - ], + "route_maps": [{ + "name": "RMAP_MED_R3", + "direction": "in" + }], "next_hop_self": True - } + }, + "r1": {"graceful-restart-helper": True} } } } @@ -123,7 +126,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True ------- True or False """ - logger.debug("Entering lib API: create_router_bgp()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False # Flag is used when testing ipv6 over ipv4 or vice-versa @@ -165,8 +168,12 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True if neigh_unicast: data_all_bgp = __create_bgp_unicast_neighbor( - tgen, topo, input_dict, router, afi_test, - config_data=data_all_bgp + tgen, + topo, + input_dict, + router, + afi_test, + config_data=data_all_bgp, ) try: @@ -179,7 +186,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_router_bgp()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result @@ -199,7 +206,7 @@ def __create_bgp_global(tgen, input_dict, router, build=False): True or False """ - logger.debug("Entering lib API: __create_bgp_global()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) bgp_data = input_dict[router]["bgp"] del_bgp_action = bgp_data.setdefault("delete", False) @@ -223,8 +230,6 @@ def __create_bgp_global(tgen, input_dict, router, build=False): cmd = "{} vrf {}".format(cmd, vrf_id) config_data.append(cmd) - - # Skip RFC8212 in topotests config_data.append("no bgp ebgp-requires-policy") router_id = bgp_data.setdefault("router_id", None) @@ -300,12 +305,13 @@ def __create_bgp_global(tgen, input_dict, router, build=False): config_data.append(cmd) - logger.debug("Exiting lib API: create_bgp_global()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return config_data -def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test, - config_data=None): +def __create_bgp_unicast_neighbor( + tgen, topo, input_dict, router, afi_test, config_data=None +): """ Helper API to create configuration for address-family unicast @@ -319,7 +325,7 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test, * `build` : Only for initial setup phase this is set as True. """ - logger.debug("Entering lib API: __create_bgp_unicast_neighbor()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) add_neigh = True if "router bgp" in config_data: @@ -428,7 +434,7 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test, config_data.extend(neigh_addr_data) - logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return config_data @@ -445,7 +451,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): """ config_data = [] - logger.debug("Entering lib API: __create_bgp_neighbor()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) bgp_data = input_dict[router]["bgp"]["address_family"] neigh_data = bgp_data[addr_type]["unicast"]["neighbor"] @@ -520,7 +526,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): ) config_data.append("{} enforce-multihop".format(neigh_cxt)) - logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return config_data @@ -541,7 +547,7 @@ def __create_bgp_unicast_address_family( """ config_data = [] - logger.debug("Entering lib API: __create_bgp_unicast_neighbor()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) bgp_data = input_dict[router]["bgp"]["address_family"] neigh_data = bgp_data[addr_type]["unicast"]["neighbor"] @@ -573,26 +579,23 @@ def __create_bgp_unicast_address_family( ] neigh_cxt = "neighbor {}".format(ip_addr) - config_data.append("address-family {} unicast".format( - addr_type - )) + config_data.append("address-family {} unicast".format(addr_type)) if activate_addr_family is not None: - config_data.append("address-family {} unicast".format( - activate_addr_family)) - config_data.append( - "{} activate".format(neigh_cxt)) + "address-family {} unicast".format(activate_addr_family) + ) + + config_data.append("{} activate".format(neigh_cxt)) if deactivate and activate_addr_family is None: - config_data.append( - "no neighbor {} activate".format(deactivate)) + config_data.append("no neighbor {} activate".format(deactivate)) if deactivate_addr_family is not None: - config_data.append("address-family {} unicast".format( - deactivate_addr_family)) config_data.append( - "no {} activate".format(neigh_cxt)) + "address-family {} unicast".format(deactivate_addr_family) + ) + config_data.append("no {} activate".format(neigh_cxt)) next_hop_self = peer.setdefault("next_hop_self", None) send_community = peer.setdefault("send_community", None) @@ -722,7 +725,7 @@ def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict): """ - logger.debug("Entering lib API: modify_bgp_config_when_bgpd_down()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) try: global LOGDIR @@ -759,7 +762,7 @@ def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: modify_bgp_config_when_bgpd_down") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -801,7 +804,7 @@ def verify_router_id(tgen, topo, input_dict): errormsg(str) or True """ - logger.debug("Entering lib API: verify_router_id()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): if router not in tgen.routers(): continue @@ -832,7 +835,7 @@ def verify_router_id(tgen, topo, input_dict): ) return errormsg - logger.debug("Exiting lib API: verify_router_id()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -857,7 +860,7 @@ def verify_bgp_convergence(tgen, topo): errormsg(str) or True """ - logger.debug("Entering lib API: verify_bgp_convergence()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if "bgp" not in topo["routers"][router]: continue @@ -939,7 +942,7 @@ def modify_as_number(tgen, topo, input_dict): errormsg(str) or True """ - logger.debug("Entering lib API: modify_as_number()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) try: new_topo = deepcopy(topo["routers"]) @@ -966,8 +969,7 @@ def modify_as_number(tgen, topo, input_dict): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: modify_as_number()") - + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -1001,7 +1003,7 @@ def verify_as_numbers(tgen, topo, input_dict): errormsg(str) or True """ - logger.debug("Entering lib API: verify_as_numbers()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): if router not in tgen.routers(): continue @@ -1066,10 +1068,47 @@ def verify_as_numbers(tgen, topo, input_dict): remote_as, ) - logger.debug("Exiting lib API: verify_AS_numbers()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True +def clear_bgp(tgen, addr_type, router, vrf=None): + """ + This API is to clear bgp neighborship by running + clear ip bgp */clear bgp ipv6 * command, + + Parameters + ---------- + * `tgen`: topogen object + * `addr_type`: ip type ipv4/ipv6 + * `router`: device under test + * `vrf`: vrf name + + Usage + ----- + clear_bgp(tgen, addr_type, "r1") + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + if vrf: + if type(vrf) is not list: + vrf = [vrf] + + # Clearing BGP + logger.info("Clearing BGP neighborship for router %s..", router) + if addr_type == "ipv4": + run_frr_cmd(rnode, "clear ip bgp *") + elif addr_type == "ipv6": + run_frr_cmd(rnode, "clear bgp ipv6 *") + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + def clear_bgp_and_verify(tgen, topo, router): """ This API is to clear bgp neighborship and verify bgp neighborship @@ -1093,7 +1132,7 @@ def clear_bgp_and_verify(tgen, topo, router): errormsg(str) or True """ - logger.debug("Entering lib API: clear_bgp_and_verify()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) if router not in tgen.routers(): return False @@ -1264,7 +1303,7 @@ def clear_bgp_and_verify(tgen, topo, router): ) return errormsg - logger.debug("Exiting lib API: clear_bgp_and_verify()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -1300,7 +1339,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): errormsg(str) or True """ - logger.debug("Entering lib API: verify_bgp_timers_and_functionality()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) sleep(5) router_list = tgen.routers() for router in input_dict.keys(): @@ -1507,7 +1546,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict): bgp_neighbor, ) - logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -1571,7 +1610,7 @@ def verify_bgp_attributes( errormsg(str) or True """ - logger.debug("Entering lib API: verify_bgp_attributes()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: continue @@ -1646,7 +1685,7 @@ def verify_bgp_attributes( ) return errormsg - logger.debug("Exiting lib API: verify_bgp_attributes()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -1701,7 +1740,7 @@ def verify_best_path_as_per_bgp_attribute( errormsg(str) or True """ - logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) if router not in tgen.routers(): return False @@ -1811,7 +1850,7 @@ def verify_best_path_as_per_bgp_attribute( router, ) - logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -1850,7 +1889,7 @@ def verify_best_path_as_per_admin_distance( errormsg(str) or True """ - logger.debug("Entering lib API: verify_best_path_as_per_admin_distance()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) router_list = tgen.routers() if router not in router_list: return False @@ -1922,11 +1961,11 @@ def verify_best_path_as_per_admin_distance( router, ) - logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True -@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +@retry(attempts=10, wait=2, return_is_str=True, initial_wait=2) def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None): """ This API is to verify whether bgp rib has any @@ -1956,7 +1995,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) errormsg(str) or True """ - logger.debug("Entering lib API: verify_bgp_rib()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) router_list = tgen.routers() additional_nexthops_in_required_nhs = [] @@ -2171,7 +2210,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) "routes are: {}\n".format(dut, found_routes) ) - logger.debug("Exiting lib API: verify_bgp_rib()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2234,7 +2273,7 @@ def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer): errormsg(str) or True """ - logger.debug("Entering lib API: verify_graceful_restart()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: @@ -2273,8 +2312,6 @@ def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer): isjson=True, ) - logger.info("show_bgp_graceful_json {}".format(show_bgp_graceful_json)) - show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: @@ -2422,7 +2459,7 @@ def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer): ) return errormsg - logger.debug("Exiting lib API: verify_graceful_restart()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2484,7 +2521,7 @@ def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer): errormsg(str) or True """ - logger.debug("Entering lib API: verify_r_bit()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: @@ -2540,11 +2577,11 @@ def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer): errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip) return errormsg - logger.debug("Exiting lib API: verify_r_bit()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True -@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) def verify_eor(tgen, topo, addr_type, input_dict, dut, peer): """ This API is to verify EOR @@ -2602,7 +2639,7 @@ def verify_eor(tgen, topo, addr_type, input_dict, dut, peer): ------- errormsg(str) or True """ - logger.debug("Entering lib API: verify_eor()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: @@ -2651,58 +2688,59 @@ def verify_eor(tgen, topo, addr_type, input_dict, dut, peer): ) return errormsg - for afi, afi_data in show_bgp_graceful_json_out.items(): - if "v4" not in afi and "v6" not in afi: - continue + if addr_type == "ipv4": + afi = "ipv4Unicast" + elif addr_type == "ipv6": + afi = "ipv6Unicast" + else: + errormsg = "Address type %s is not supported" % (addr_type) + return errormsg - eor_json = afi_data["endOfRibStatus"] - if "endOfRibSend" in eor_json: + eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"] + if "endOfRibSend" in eor_json: - if eor_json["endOfRibSend"]: - logger.info( - "[DUT: %s]: EOR Send true for %s " "%s", - dut, - neighbor_ip, - afi, - ) - else: - errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % ( - dut, - neighbor_ip, - afi, - ) - return errormsg + if eor_json["endOfRibSend"]: + logger.info( + "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi + ) + else: + errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % ( + dut, + neighbor_ip, + afi, + ) + return errormsg - if "endOfRibRecv" in eor_json: - if eor_json["endOfRibRecv"]: - logger.info( - "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi - ) - else: - errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % ( - dut, - neighbor_ip, - afi, - ) - return errormsg + if "endOfRibRecv" in eor_json: + if eor_json["endOfRibRecv"]: + logger.info( + "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi + ) + else: + errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % ( + dut, + neighbor_ip, + afi, + ) + return errormsg - if "endOfRibSentAfterUpdate" in eor_json: - if eor_json["endOfRibSentAfterUpdate"]: - logger.info( - "[DUT: %s]: EOR SendTime true for %s" " %s", - dut, - neighbor_ip, - afi, - ) - else: - errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % ( - dut, - neighbor_ip, - afi, - ) - return errormsg + if "endOfRibSentAfterUpdate" in eor_json: + if eor_json["endOfRibSentAfterUpdate"]: + logger.info( + "[DUT: %s]: EOR SendTime true for %s" " %s", + dut, + neighbor_ip, + afi, + ) + else: + errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % ( + dut, + neighbor_ip, + afi, + ) + return errormsg - logger.debug("Exiting lib API: verify_eor()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2766,7 +2804,7 @@ def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer): errormsg(str) or True """ - logger.debug("Entering lib API: verify_f_bit()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: @@ -2815,21 +2853,34 @@ def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer): ) return errormsg - for afi, afi_data in show_bgp_graceful_json_out.items(): - if "v4" not in afi and "v6" not in afi: - continue + if "ipv4Unicast" in show_bgp_graceful_json_out: + if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]: + logger.info( + "[DUT: {}]: Fbit True for {} IPv4" + " Unicast".format(dut, neighbor_ip) + ) + else: + errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format( + dut, neighbor_ip + ) + return errormsg - if afi_data["fBit"]: + elif "ipv6Unicast" in show_bgp_graceful_json_out: + if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]: logger.info( - "[DUT: {}]: Fbit True for {} {}".format(dut, neighbor_ip, afi) + "[DUT: {}]: Fbit True for {} IPv6" + " Unicast".format(dut, neighbor_ip) ) else: - errormsg = "[DUT: {}]: Fbit False for {} {}".format( - dut, neighbor_ip, afi + errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format( + dut, neighbor_ip ) return errormsg + else: + show_bgp_graceful_json_out["ipv4Unicast"] + show_bgp_graceful_json_out["ipv6Unicast"] - logger.debug("Exiting lib API: verify_f_bit()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2879,7 +2930,7 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer) errormsg(str) or True """ - logger.debug("Entering lib API: verify_graceful_restart_timers()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: @@ -2955,7 +3006,7 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer) ) return errormsg - logger.debug("Exiting lib API: verify_graceful_restart_timers") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2970,20 +3021,20 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut): * `tgen`: topogen object * `topo`: input json file data * `addr_type` : ip type ipv4/ipv6 - * `addr_family` : address family type IPV4 Unicast/IPV6 Unicast + * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast * `dut`: input dut router name Usage ----- - result = verify_gr_address_family(tgen, topo, addr_type, addr_family, - , dut) + + result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1") Returns ------- errormsg(str) or True """ - logger.debug("Entering lib API: verify_gr_address_family()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router, rnode in tgen.routers().iteritems(): if router != dut: @@ -3025,19 +3076,25 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut): errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip) return errormsg - if "v4" in addr_family: - input_afi = "v4" - elif "v6" in addr_family: - input_afi = "v6" - - for afi in show_bgp_graceful_json_out.keys(): - if input_afi not in afi: - continue + if addr_family == "ipv4Unicast": + if "ipv4Unicast" in show_bgp_graceful_json_out: + logger.info("ipv4Unicast present for {} ".format(neighbor_ip)) + return True else: - logger.info("{} present for {} ".format(addr_family, neighbor_ip)) + errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip) + return errormsg + + elif addr_family == "ipv6Unicast": + if "ipv6Unicast" in show_bgp_graceful_json_out: + logger.info("ipv6Unicast present for {} ".format(neighbor_ip)) return True + else: + errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip) + return errormsg else: - errormsg = "{} NOT present for {} ".format(addr_family, neighbor_ip) + errormsg = "Aaddress family: {} present for {} ".format( + addr_family, neighbor_ip + ) return errormsg - logger.debug("Exiting lib API: verify_gr_address_family()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 72ee28c1c6..0b19877aff 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -32,6 +32,7 @@ from tempfile import mkdtemp import StringIO import os +import sys import ConfigParser import traceback import socket @@ -100,13 +101,11 @@ SEQ_ID = {"prefix_lists": {}, "route_maps": {}} def get_seq_id(obj_type, router, obj_name): """ Generates and saves sequence number in interval of 10 - Parameters ---------- * `obj_type`: prefix_lists or route_maps * `router`: router name *` obj_name`: name of the prefix-list or route-map - Returns -------- Sequence number generated @@ -125,7 +124,6 @@ def get_seq_id(obj_type, router, obj_name): def set_seq_id(obj_type, router, id, obj_name): """ Saves sequence number if not auto-generated and given by user - Parameters ---------- * `obj_type`: prefix_lists or route_maps @@ -149,11 +147,9 @@ class InvalidCLIError(Exception): def run_frr_cmd(rnode, cmd, isjson=False): """ Execute frr show commands in priviledged mode - * `rnode`: router node on which commands needs to executed * `cmd`: Command to be executed on frr * `isjson`: If command is to get json data or not - :return str: """ @@ -179,12 +175,13 @@ def run_frr_cmd(rnode, cmd, isjson=False): raise InvalidCLIError("No actual cmd passed") -def create_common_configuration(tgen, router, data, config_type=None, build=False, load_config=True): +def create_common_configuration( + tgen, router, data, config_type=None, build=False, load_config=True +): """ API to create object of class FRRConfig and also create frr_json.conf file. It will create interface and common configurations and save it to frr_json.conf and load to router - Parameters ---------- * `tgen`: tgen onject @@ -193,7 +190,6 @@ def create_common_configuration(tgen, router, data, config_type=None, build=Fals * `config_type` : Syntactic information while writing configuration. Should be one of the value as mentioned in the config_map below. * `build` : Only for initial setup phase this is set as True - Returns ------- True or False @@ -248,13 +244,12 @@ def kill_router_daemons(tgen, router, daemons): """ Router's current config would be saved to /etc/frr/ for each deamon and deamon would be killed forcefully using SIGKILL. - * `tgen` : topogen object * `router`: Device under test * `daemons`: list of daemons to be killed """ - logger.debug("Entering lib API: kill_router_daemons()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) try: router_list = tgen.routers() @@ -278,20 +273,18 @@ def kill_router_daemons(tgen, router, daemons): def start_router_daemons(tgen, router, daemons): """ Daemons defined by user would be started - * `tgen` : topogen object * `router`: Device under test * `daemons`: list of daemons to be killed """ - logger.debug("Entering lib API: start_router_daemons()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) try: router_list = tgen.routers() # Start daemons result = router_list[router].startDaemons(daemons) - sleep(5) return result except Exception as e: @@ -299,14 +292,13 @@ def start_router_daemons(tgen, router, daemons): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: start_router_daemons()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True def kill_mininet_routers_process(tgen): """ Kill all mininet stale router' processes - * `tgen` : topogen object """ @@ -331,11 +323,10 @@ def kill_mininet_routers_process(tgen): def check_router_status(tgen): """ Check if all daemons are running for all routers in topology - * `tgen` : topogen object """ - logger.debug("Entering lib API: start_router_daemons()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) try: router_list = tgen.routers() @@ -356,7 +347,7 @@ def check_router_status(tgen): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: start_router_daemons()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -364,7 +355,6 @@ def reset_config_on_routers(tgen, routerName=None): """ Resets configuration on routers to the snapshot created using input JSON file. It replaces existing router configuration with FRRCFG_BKUP_FILE - Parameters ---------- * `tgen` : Topogen object @@ -480,7 +470,6 @@ def reset_config_on_routers(tgen, routerName=None): def load_config_to_router(tgen, routerName, save_bkup=False): """ Loads configuration on router from the file FRRCFG_FILE. - Parameters ---------- * `tgen` : Topogen object @@ -536,16 +525,13 @@ def get_frr_ipv6_linklocal(tgen, router, intf=None): """ API to get the link local ipv6 address of a perticular interface using FRR command 'show interface' - * `tgen`: tgen onject * `router` : router for which hightest interface should be calculated * `intf` : interface for which linklocal address needs to be taken - Usage ----- linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A) - Returns ------- 1) array of interface names to link local ips. @@ -675,11 +661,9 @@ def number_to_column(routerName): def validate_ip_address(ip_address): """ Validates the type of ip address - Parameters ---------- * `ip_address`: IPv4/IPv6 address - Returns ------- Type of address as string @@ -746,7 +730,6 @@ def generate_ips(network, no_of_ips): """ Returns list of IPs. based on start_ip and no_of_ips - * `network` : from here the ip will start generating, start_ip will be * `no_of_ips` : these many IPs will be generated @@ -787,7 +770,6 @@ def find_interface_with_greater_ip(topo, router, loopback=True, interface=True): Returns highest interface ip for ipv4/ipv6. If loopback is there then it will return highest IP from loopback IPs otherwise from physical interface IPs. - * `topo` : json file data * `router` : router for which hightest interface should be calculated """ @@ -835,11 +817,9 @@ def write_test_footer(tc_name): def interface_status(tgen, topo, input_dict): """ Delete ip route maps from device - * `tgen` : Topogen object * `topo` : json file data * `input_dict` : for which router, route map has to be deleted - Usage ----- input_dict = { @@ -852,7 +832,7 @@ def interface_status(tgen, topo, input_dict): ------- errormsg(str) or True """ - logger.debug("Entering lib API: interface_status()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) try: global frr_cfg @@ -876,19 +856,17 @@ def interface_status(tgen, topo, input_dict): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: interface_status()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0): """ Retries function execution, if return is an errormsg or exception - * `attempts`: Number of attempts to make * `wait`: Number of seconds to wait between each attempt * `return_is_str`: Return val is an errormsg in case of failure * `initial_wait`: Sleeps for this much seconds before executing function - """ def _retry(func): @@ -967,13 +945,11 @@ def create_interfaces_cfg(tgen, topo, build=False): """ Create interface configuration for created topology. Basic Interface configuration is provided in input json file. - Parameters ---------- * `tgen` : Topogen object * `topo` : json file data * `build` : Only for initial setup phase this is set as True. - Returns ------- True or False @@ -1012,13 +988,11 @@ def create_interfaces_cfg(tgen, topo, build=False): def create_static_routes(tgen, input_dict, build=False): """ Create static routes for given router as defined in input_dict - Parameters ---------- * `tgen` : Topogen object * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. - Usage ----- input_dict should be in the format below: @@ -1029,7 +1003,6 @@ def create_static_routes(tgen, input_dict, build=False): # next_hop: starting next-hop address # tag: tag id for static routes # delete: True if config to be removed. Default False. - Example: "routers": { "r1": { @@ -1045,13 +1018,12 @@ def create_static_routes(tgen, input_dict, build=False): ] } } - Returns ------- errormsg(str) or True """ result = False - logger.debug("Entering lib API: create_static_routes()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): @@ -1108,7 +1080,7 @@ def create_static_routes(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_static_routes()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result @@ -1116,13 +1088,11 @@ def create_prefix_lists(tgen, input_dict, build=False): """ Create ip prefix lists as per the config provided in input JSON or input_dict - Parameters ---------- * `tgen` : Topogen object * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. - Usage ----- # pf_lists_1: name of prefix-list, user defined @@ -1131,7 +1101,6 @@ def create_prefix_lists(tgen, input_dict, build=False): # action: permit/deny # le: less than or equal number of bits # ge: greater than or equal number of bits - Example ------- input_dict = { @@ -1152,13 +1121,12 @@ def create_prefix_lists(tgen, input_dict, build=False): } } } - Returns ------- errormsg or True """ - logger.debug("Entering lib API: create_prefix_lists()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) result = False try: for router in input_dict.keys(): @@ -1217,20 +1185,18 @@ def create_prefix_lists(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_prefix_lists()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result def create_route_maps(tgen, input_dict, build=False): """ Create route-map on the devices as per the arguments passed - Parameters ---------- * `tgen` : Topogen object * `input_dict` : Input dict data, required when configuring from testcase * `build` : Only for initial setup phase this is set as True. - Usage ----- # route_maps: key, value pair for route-map name and its attribute @@ -1251,7 +1217,6 @@ def create_route_maps(tgen, input_dict, build=False): # large_community: large community value to be attached # community_additive: if set to "additive", adds community/large-community value to the existing values of the network prefix - Example: -------- input_dict = { @@ -1267,7 +1232,6 @@ def create_route_maps(tgen, input_dict, build=False): "ipv6": { "prefix_list": "pf_list_1" } - "large-community-list": { "id": "community_1", "exact_match": True @@ -1300,14 +1264,13 @@ def create_route_maps(tgen, input_dict, build=False): } } } - Returns ------- errormsg(str) or True """ result = False - logger.debug("Entering lib API: create_route_maps()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): @@ -1572,18 +1535,16 @@ def create_route_maps(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_route_maps()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result def delete_route_maps(tgen, input_dict): """ Delete ip route maps from device - * `tgen` : Topogen object * `input_dict` : for which router, route map has to be deleted - Usage ----- # Delete route-map rmap_1 and rmap_2 from router r1 @@ -1593,12 +1554,11 @@ def delete_route_maps(tgen, input_dict): } } result = delete_route_maps("ipv4", input_dict) - Returns ------- errormsg(str) or True """ - logger.info("Entering lib API: delete_route_maps()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): route_maps = input_dict[router]["route_maps"][:] @@ -1614,7 +1574,6 @@ def create_bgp_community_lists(tgen, input_dict, build=False): """ Create bgp community-list or large-community-list on the devices as per the arguments passed. Takes list of communities in input. - Parameters ---------- * `tgen` : Topogen object @@ -1640,7 +1599,7 @@ def create_bgp_community_lists(tgen, input_dict, build=False): """ result = False - logger.debug("Entering lib API: create_bgp_community_lists()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) input_dict = deepcopy(input_dict) try: for router in input_dict.keys(): @@ -1696,30 +1655,26 @@ def create_bgp_community_lists(tgen, input_dict, build=False): logger.error(errormsg) return errormsg - logger.debug("Exiting lib API: create_bgp_community_lists()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False): """ Shutdown or bringup router's interface " - * `tgen` : Topogen object * `dut` : Device under test * `intf_name` : Interface name to be shut/no shut * `ifaceaction` : Action, to shut/no shut interface, by default is False - Usage ----- dut = "r3" intf = "r3-r1-eth0" # Shut down ineterface shutdown_bringup_interface(tgen, dut, intf, False) - # Bring up ineterface shutdown_bringup_interface(tgen, dut, intf, True) - Returns ------- errormsg(str) or True @@ -1745,7 +1700,6 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): advertise_networks_using_network_command() and do will verify next_hop and each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" command o/p. - Parameters ---------- * `tgen` : topogen object @@ -1755,14 +1709,12 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): * `next_hop`[optional]: next_hop which needs to be verified, default: static * `protocol`[optional]: protocol, default = None - Usage ----- # RIB can be verified for static routes OR network advertised using network command. Following are input_dicts to create static routes and advertise networks using network command. Any one of the input_dict can be passed to verify_rib() to verify routes in DUT"s RIB. - # Creating static routes for r1 input_dict = { "r1": { @@ -1780,13 +1732,12 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): dut = "r2" protocol = "bgp" result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) - Returns ------- errormsg(str) or True """ - logger.debug("Entering lib API: verify_rib()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) router_list = tgen.routers() for routerInput in input_dict.keys(): @@ -1820,10 +1771,9 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): static_routes = input_dict[routerInput]["static_routes"] st_found = False nh_found = False + found_routes = [] + missing_routes = [] for static_route in static_routes: - found_routes = [] - missing_routes = [] - network = static_route["network"] if "no_of_ip" in static_route: no_of_ip = static_route["no_of_ip"] @@ -1923,7 +1873,9 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): next_hop = [next_hop] for nh in next_hop: - for nh_json in rib_routes_json[st_rt][0]["nexthops"]: + for nh_json in rib_routes_json[st_rt][0][ + "nexthops" + ]: if nh != nh_json["ip"]: continue nh_found = True @@ -1959,7 +1911,7 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): " routes are: {}\n".format(addr_type, dut, found_routes) ) - logger.debug("Exiting lib API: verify_rib()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -1967,7 +1919,6 @@ def verify_admin_distance_for_static_routes(tgen, input_dict): """ API to verify admin distance for static routes as defined in input_dict/ input JSON by running show ip/ipv6 route json command. - Parameter --------- * `tgen` : topogen object @@ -1987,13 +1938,12 @@ def verify_admin_distance_for_static_routes(tgen, input_dict): } } result = verify_admin_distance_for_static_routes(tgen, input_dict) - Returns ------- errormsg(str) or True """ - logger.debug("Entering lib API: verify_admin_distance_for_static_routes()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): if router not in tgen.routers(): @@ -2050,7 +2000,7 @@ def verify_admin_distance_for_static_routes(tgen, input_dict): ) return errormsg - logger.debug("Exiting lib API: verify_admin_distance_for_static_routes()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2058,12 +2008,10 @@ def verify_prefix_lists(tgen, input_dict): """ Running "show ip prefix-list" command and verifying given prefix-list is present in router. - Parameters ---------- * `tgen` : topogen object * `input_dict`: data to verify prefix lists - Usage ----- # To verify pf_list_1 is present in router r1 @@ -2072,13 +2020,12 @@ def verify_prefix_lists(tgen, input_dict): "prefix_lists": ["pf_list_1"] }} result = verify_prefix_lists("ipv4", input_dict, tgen) - Returns ------- errormsg(str) or True """ - logger.debug("Entering lib API: verify_prefix_lists()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): if router not in tgen.routers(): @@ -2109,7 +2056,7 @@ def verify_prefix_lists(tgen, input_dict): router, ) - logger.debug("Exiting lib API: verify_prefix_lists()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2136,7 +2083,7 @@ def verify_route_maps(tgen, input_dict): errormsg(str) or True """ - logger.debug("Entering lib API: verify_route_maps()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): if router not in tgen.routers(): @@ -2161,7 +2108,7 @@ def verify_route_maps(tgen, input_dict): router, ) - logger.debug("Exiting lib API: verify_route_maps()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2170,7 +2117,6 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): """ API to veiryf BGP large community is attached in route for any given DUT by running "show bgp ipv4/6 {route address} json" command. - Parameters ---------- * `tgen`: topogen object @@ -2186,13 +2132,12 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5" } result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None) - Returns ------- errormsg(str) or True """ - logger.info("Entering lib API: verify_bgp_community()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) if router not in tgen.routers(): return False @@ -2254,7 +2199,7 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): ) return errormsg - logger.debug("Exiting lib API: verify_bgp_community()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True @@ -2283,7 +2228,7 @@ def verify_create_community_list(tgen, input_dict): errormsg(str) or True """ - logger.debug("Entering lib API: verify_create_community_list()") + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) for router in input_dict.keys(): if router not in tgen.routers(): @@ -2311,5 +2256,5 @@ def verify_create_community_list(tgen, input_dict): ) return errormsg - logger.debug("Exiting lib API: verify_create_community_list()") + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True |
