summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json115
-rwxr-xr-xtests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py3495
-rw-r--r--tests/topotests/lib/bgp.py341
-rw-r--r--tests/topotests/lib/common_config.py129
4 files changed, 3846 insertions, 234 deletions
diff --git a/tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json b/tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json
new file mode 100644
index 0000000000..7b3cac814f
--- /dev/null
+++ b/tests/topotests/bgp_gr_functionality_topo1/bgp_gr_topojson_topo1.json
@@ -0,0 +1,115 @@
+{
+ "ipv4base":"192.168.0.0",
+ "ipv4mask":24,
+ "ipv6base":"fd00::",
+ "ipv6mask":64,
+ "link_ip_start":{"ipv4":"192.168.0.0", "v4mask":24, "ipv6":"fd00::", "v6mask":64},
+ "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128},
+ "routers":{
+ "r1":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r2-link1": {"ipv4":"auto", "ipv6":"auto"},
+ "r2-link2": {"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"}
+ ],
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"}
+ ],
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes":[
+ {
+ "network":"100.0.10.1/32",
+ "no_of_ip":5,
+ "next_hop":"192.168.1.2"
+ },
+ {
+ "network":"1::1/128",
+ "no_of_ip":5,
+ "next_hop":"fd00:0:0:1::2"
+ }]},
+ "r2":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r1-link1": {"ipv4":"auto", "ipv6":"auto"},
+ "r1-link2": {"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp": {
+ "local_as": "200",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"}
+ ],
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"}
+ ],
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes":[
+ {
+ "network":"200.0.20.1/32",
+ "no_of_ip":5,
+ "next_hop":"192.168.1.1"
+ },
+ {
+ "network":"2::1/128",
+ "no_of_ip":5,
+ "next_hop":"fd00:0:0:1::1"
+ }]
+ }
+ }
+}
diff --git a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py
new file mode 100755
index 0000000000..fdc1bed522
--- /dev/null
+++ b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py
@@ -0,0 +1,3495 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
+# in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test BGP Gracefull Restart functionality.
+Basic Common Test steps for all the test case below :
+- Create topology (setup module)
+ Creating 2 routers topology, r1, r2 in IBGP
+- Bring up topology
+- Verify for bgp to converge
+- Configure BGP Garceful Restart on both the routers.
+
+1. Helper BGP router R1, mark and unmark IPV4 routes
+ as stale as the restarting router R2 come up within the restart time.
+2. Helper BGP router R1, mark IPV4 routes as stale and
+ deletes them as the restarting router R2 did-not come up within restart
+ time.
+3. Restart BGP router R1, detects it is connected to R2,
+ which is a helper router. Verify the restart capability i.e. R bit
+ are sent after R1 reloads and comes back.
+4. Verify that the restarting node sets "R" bit while sending the
+ BGP open messages after the node restart, only if GR is enabled.
+5. Verify if restarting node resets R bit in BGP open message
+ during normal BGP session flaps as well, even when GR restarting mode is enabled.
+ Here link flap happen due to interface UP/DOWN.
+6. Verify if restarting node resets R bit in BGP open message
+ during normal BGP session flaps as well, even when GR restarting mode is enabled.
+ Here link flap happen due to neigh router restarts
+7. Verify if restarting node resets R bit in BGP open message
+ during normal BGP session flaps when GR helper mode is enabled.
+ Here link flap happen due to interface UP/DOWN.
+8. Verify if restarting node resets R bit in BGP open message
+ during normal BGP session flaps when GR helper mode is enabled.
+ Here link flap happen due to neigh router restarts.
+9. Verify that restarting nodes set "F" bit while sending
+ the BGP open messages after it restarts, only when BGP GR is enabled.
+10. Verify that restarting nodes reset "F" bit while sending
+ the BGP open messages after it's restarts, when BGP GR is **NOT** enabled.
+11. Verify that only GR helper routers keep the stale
+ route entries, not any GR disabled router.
+12. Verify that GR helper routers keeps all the routes received
+ from restarting node if both the routers are configured as GR restarting node.
+13. Verify that GR helper routers delete all the routes
+ received from a node if both the routers are configured as GR helper node.
+14. Test Objective : After BGP neighborship is established and GR capability
+ is exchanged, transition helper router to disabled state.
+15.Test Objective : After BGP neighborship is established and GR capability
+ is exchanged, transition disabled router to helper state.
+16. Verify transition from Global Restarting to Disable and then
+ Global Disable to Restarting.
+17. Verify transition from Global Helper to Disable and then Global
+ Disable to Helper.
+18. Verify transition from Global Restart to Helper and then Global
+ Helper to Restart.
+19. Verify transition from Peer-level helper to Global Restarting.
+20. Verify transition from Peer-level restart to Global Restart.
+21. Verify transition from Peer-level disabled to Global Restart.
+22. Verify Peer-level inherit from Global Restarting mode.
+23. Verify transition from Peer-level helper to Global inherit helper.
+24. Verify transition from Peer-level restart to Global inherit helper.
+25. Verify transition from Peer-level disbale to Global inherit helper.
+26. Verify default GR functional mode is Helper.
+27. Verify transition from Peer-level Helper to Global Disable.
+28. Verify transition from Peer-level Restarting to Global Disable.
+29. Verify transition from Peer-level Disable to Global Disable.
+30. Verfiy Peer-level inherit from Global Disable mode.
+
+"""
+
+import os
+import sys
+import json
+import time
+import inspect
+import pytest
+from time import sleep
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join("../"))
+sys.path.append(os.path.join("../lib/"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib import topotest
+from lib.topogen import Topogen, TopoRouter, get_topogen
+from lib.topolog import logger
+
+# Required to instantiate the topology builder class.
+from mininet.topo import Topo
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.topojson import build_topo_from_json, build_config_from_json
+from lib.bgp import (
+ clear_bgp,
+ verify_bgp_rib,
+ verify_graceful_restart,
+ create_router_bgp,
+ verify_r_bit,
+ verify_f_bit,
+ verify_bgp_convergence,
+ verify_graceful_restart_timers,
+)
+
+from lib.common_config import (
+ write_test_header,
+ reset_config_on_routers,
+ start_topology,
+ kill_router_daemons,
+ start_router_daemons,
+ verify_rib,
+ check_address_types,
+ write_test_footer,
+ check_router_status,
+ shutdown_bringup_interface,
+ step,
+ kill_mininet_routers_process,
+ get_frr_ipv6_linklocal,
+ create_route_maps,
+)
+
+# Reading the data from JSON File for topology and configuration creation
+jsonFile = "{}/bgp_gr_topojson_topo1.json".format(CWD)
+try:
+ with open(jsonFile, "r") as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ logger.info("Could not read file:", jsonFile)
+
+
+# Global variables
+NEXT_HOP_IP = {"ipv4": "192.168.1.10", "ipv6": "fd00:0:0:1::10"}
+NEXT_HOP_IP_1 = {"ipv4": "192.168.0.1", "ipv6": "fd00::1"}
+NEXT_HOP_IP_2 = {"ipv4": "192.168.0.2", "ipv6": "fd00::2"}
+BGP_CONVERGENCE = False
+GR_RESTART_TIMER = 20
+PREFERRED_NEXT_HOP = "link_local"
+
+
+class GenerateTopo(Topo):
+ """
+ Test topology builder
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # This function only purpose is to create topology
+ # as defined in input json file.
+ #
+ # Create topology (setup module)
+ # Creating 2 routers topology, r1, r2in IBGP
+ # Bring up topology
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ global ADDR_TYPES
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(GenerateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Kill stale mininet routers and process
+ kill_mininet_routers_process(tgen)
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ ADDR_TYPES = check_address_types()
+
+ for addr_type in ADDR_TYPES:
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error:" " {}".format(
+ BGP_CONVERGENCE
+ )
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
+ logger.info("=" * 40)
+
+
+def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut, peer):
+ """
+ This function groups the repetitive function calls into one function.
+ """
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ clear_bgp(tgen, addr_type, dut)
+
+ return True
+
+
+def next_hop_per_address_family(
+ tgen, dut, peer, addr_type, next_hop_dict, preferred_next_hop=PREFERRED_NEXT_HOP
+):
+ """
+ This function returns link_local or global next_hop per address-family
+ """
+
+ intferface = topo["routers"][peer]["links"]["{}-link1".format(dut)]["interface"]
+ if addr_type == "ipv6" and "link_local" in preferred_next_hop:
+ next_hop = get_frr_ipv6_linklocal(tgen, peer, intf=intferface)
+ else:
+ next_hop = next_hop_dict[addr_type]
+
+ return next_hop
+
+
+def test_BGP_GR_TC_46_p1(request):
+ """
+ Test Objective : transition from Peer-level helper to Global Restarting
+ Global Mode : GR Restarting
+ PerPeer Mode : GR Helper
+ GR Mode effective : GR Helper
+
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ step(
+ "Configure R1 and R2 as GR restarting node in global"
+ " and helper in per-Peer-level"
+ )
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {"graceful-restart": True,},
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ step("Verify on R2 that R1 advertises GR capabilities as a restarting node")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGP on R2")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ step(
+ "Verify that R1 keeps the stale entries in RIB & FIB and R2 keeps stale entries in FIB using"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step(
+ "Bring up BGP on R1 and remove Peer-level GR config"
+ " from R1 following by a session reset"
+ )
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": False}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": False}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ step("Verify on R2 that R1 advertises GR capabilities as a restarting node")
+
+ input_dict = {
+ "r1": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGP on R1")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ step(
+ "Verify that R1 keeps the stale entries in FIB command and R2 keeps stale entries in RIB & FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Routes are still present \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_bgp_rib(tgen, addr_type, "r2", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Routes are still present \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Start BGP on R1")
+
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_50_p1(request):
+ """
+ Test Objective : Transition from Peer-level helper to Global inherit helper
+ Global Mode : None
+ PerPeer Mode : Helper
+ GR Mode effective : GR Helper
+
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ step(
+ "Configure R1 as GR helper node at per Peer-level for R2"
+ " and configure R2 as global restarting node."
+ )
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ step("Verify on R2 that R1 advertises GR capabilities as a helper node")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGP on R2")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ step(
+ "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Bring up BGP on R2 and remove Peer-level GR config from R1 ")
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": False}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": False}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ step("Verify on R2 that R1 still advertises GR capabilities as a helper node")
+
+ input_dict = {
+ "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}},
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGP on R2")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ step(
+ "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Routes are still present \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Routes are still present \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Start BGP on R2")
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_51_p1(request):
+ """
+ Test Objective : Transition from Peer-level restarting to Global inherit helper
+ Global Mode : None
+ PerPeer Mode : GR Restart
+ GR Mode effective : GR Restart
+
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ step("Configure R1 as GR restarting node at per Peer-level for R2")
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+ step("Verify on R2 that R1 advertises GR capabilities as a restarting node")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGP on R1")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ step(
+ "Verify that R1 keeps the stale entries in FIB & R2 keeps stale entries in RIB & FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_bgp_rib(tgen, addr_type, "r2", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Bring up BGP on R1 and remove Peer-level GR config")
+
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": False}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": False}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ step("Verify on R2 that R1 advertises GR capabilities as a helper node")
+
+ input_dict = {
+ "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}},
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGPd on R2")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ step(
+ "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Routes are still present \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert (
+ result is True
+ ), "Testcase {} : Failed \n Routes are still present \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Start BGP on R2")
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_53_p1(request):
+ """
+ Test Objective : Default GR functional mode is Helper.
+ Global Mode : None
+ PerPeer Mode : None
+ GR Mode effective : GR Helper
+
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ step("configure R2 as global restarting node")
+
+ input_dict = {"r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}}}
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ step(
+ "Verify on R2 that R1 advertises GR capabilities as a helper node based on inherit"
+ )
+
+ input_dict = {
+ "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}},
+ "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},
+ }
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Kill BGPd on R2")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ step(
+ "Verify that R2 keeps the stale entries in FIB & R1 keeps stale entries in RIB & FIB"
+ )
+
+ for addr_type in ADDR_TYPES:
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, "r2", "r1", addr_type, NEXT_HOP_IP_1
+ )
+ input_topo = {"r1": topo["routers"]["r1"]}
+ result = verify_rib(tgen, addr_type, "r2", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ for addr_type in ADDR_TYPES:
+ next_hop = next_hop_per_address_family(
+ tgen, "r1", "r2", addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {"r2": topo["routers"]["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, "r1", input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, "r1", input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ step("Start BGP on R2")
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_UTP_1_3_p0(request):
+ """
+ Test Objective : Helper BGP router R1, mark and unmark IPV4 routes
+ as stale as the restarting router R2 come up within the restart time
+
+ Test Objective : Helper BGP router R1, mark IPV4 routes as stale and
+ deletes them as the restarting router R2 did-not come up within
+ restart time.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Create route-map to prefer global next-hop
+ input_dict = {
+ "r1": {
+ "route_maps": {
+ "rmap_global": [
+ {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}}
+ ]
+ }
+ },
+ "r2": {
+ "route_maps": {
+ "rmap_global": [
+ {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}}
+ ]
+ }
+ },
+ }
+ result = create_route_maps(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_1 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {
+ "route_maps": [
+ {
+ "name": "rmap_global",
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {
+ "route_maps": [
+ {
+ "name": "rmap_global",
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "graceful-restart": {"timer": {"restart-time": GR_RESTART_TIMER}},
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r2", peer="r1")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r2", peer="r1"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global"
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, "bgp")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R2 goes for reload ")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info(
+ "[Phase 3] : R2 is still down, restart time 120 sec."
+ " So time verify the routes are present in BGP RIB"
+ " and ZEBRA"
+ )
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global"
+ )
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 4] : sleep for {} sec".format(GR_RESTART_TIMER))
+ sleep(GR_RESTART_TIMER)
+
+ logger.info("[Phase 5] : Verify the routes from r2 ")
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = NEXT_HOP_IP_2[addr_type]
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, expected=False)
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ # Verifying RIB routes
+ result = verify_rib(
+ tgen, addr_type, dut, input_topo, next_hop, "bgp", expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ logger.info("[Phase 5] : R2 is about to come up now ")
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info("[Phase 5] : R2 is UP Now ! ")
+
+ for addr_type in ADDR_TYPES:
+ # Verifying GR stats
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r2", peer="r1"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global"
+ )
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_UTP_15_TC_9_p1(request):
+ """
+ Test Objective : Restart BGP router R1, detects it is connected to R2,
+ which is a helper router. Verify the restart capability i.e. R bit
+ are sent after R1 reloads and comes back.
+
+ Test Objective : Verify that restarting nodes reset "F" bit while sending
+ the BGP open messages after it's restarts, when BGP GR is **NOT** enabled.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Checking router status, starting if not running
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ # reset_config_on_routers(tgen)
+
+ # Create route-map to prefer global next-hop
+ input_dict = {
+ "r1": {
+ "route_maps": {
+ "rmap_global": [
+ {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}}
+ ]
+ }
+ },
+ "r2": {
+ "route_maps": {
+ "rmap_global": [
+ {"action": "permit", "set": {"ipv6": {"nexthop": "prefer-global"}}}
+ ]
+ }
+ },
+ }
+ result = create_route_maps(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_1 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {
+ "route_maps": [
+ {
+ "name": "rmap_global",
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {
+ "route_maps": [
+ {
+ "name": "rmap_global",
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ logger.info(
+ "[Phase 1] : Test Setup " "[Helper Mode]R1-----R2[Restart Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global"
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R1 goes for reload ")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 6] : R1 is about to come up now ")
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ for addr_type in ADDR_TYPES:
+ # Verifying GR stats
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r2", peer="r1")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2, preferred_next_hop="global"
+ )
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_f_bit(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_UTP_35_p1(request):
+ """
+ Test Objective : Restart BGP router R1 connected to R2,
+ which is a restart router.
+ R1 should not send any GR capability in the open message,
+ however it would process open message from R2 with GR -restart
+ capability, but would not perform any BGP GR functionality.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup" " [Disable Mode]R1-----R2[Restart Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R1 goes for reload ")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 3] : R1 is about to come up now ")
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_4_p0(request):
+ """
+ Test Objective : Verify that the restarting node sets "R" bit while sending the
+ BGP open messages after the node restart, only if GR is enabled.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup" " [Restart Mode]R1-----R2[Helper Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R2 goes for reload ")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info(
+ "[Phase 3] : R2 is still down, restart time {} sec."
+ "So time verify the routes are present in BGP RIB and ZEBRA ".format(
+ GR_RESTART_TIMER
+ )
+ )
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(
+ tgen, addr_type, dut, input_topo, next_hop, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ # Verifying RIB routes
+ result = verify_rib(
+ tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ logger.info("[Phase 5] : R2 is about to come up now ")
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_5_1_2_p1(request):
+ """
+ Test Objective : Verify if restarting node resets R bit in BGP open message
+ during normal BGP session flaps as well, even when GR restarting mode is enabled.
+ Here link flap happen due to interface UP/DOWN.
+
+ """
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup" " [Restart Mode]R1-----R2[Restart Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : Now flap the link running the BGP session ")
+ # Shutdown interface
+ intf = "r2-r1-eth0"
+ shutdown_bringup_interface(tgen, "r2", intf)
+
+ # Bring up Interface
+ shutdown_bringup_interface(tgen, dut, intf, ifaceaction=True)
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : Restart BGPd on router R2. ")
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r1", peer="r2")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_6_1_2_p1(request):
+ """
+ Test Objective : Verify if restarting node resets R bit in BGP
+ open message during normal BGP session flaps when GR is disabled.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup" "[Restart Mode]R1-----R2[Helper Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 1] : Changing mode" "[Disable Mode]R1-----R2[Helper Mode]")
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ clear_bgp(tgen, addr_type, "r1")
+ clear_bgp(tgen, addr_type, "r2")
+
+ # Verify GR stats
+ input_dict = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ # here the verify_graceful_restart fro the neighbor would be
+ # "NotReceived" as the latest GR config is not yet applied.
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : Now flap the link running the BGP session ")
+ # Shutdown interface
+ intf = "r2-r1-eth0"
+ shutdown_bringup_interface(tgen, "r2", intf)
+
+ # Bring up Interface
+ shutdown_bringup_interface(tgen, dut, intf, ifaceaction=True)
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(
+ tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("Restart BGPd on R2 ")
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(
+ tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_8_p1(request):
+ """
+ Test Objective : Verify that restarting nodes set "F" bit while sending
+ the BGP open messages after it restarts, only when BGP GR is enabled.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup" " [Restart Mode]R1-----R2[Restart Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {"preserve-fw-state": True},
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R1 goes for reload ")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 3] : R1 is about to come up now ")
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(tgen, topo, addr_type, input_dict, dut="r2", peer="r1")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_f_bit(tgen, topo, addr_type, input_dict, dut="r2", peer="r1")
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_17_p1(request):
+ """
+ Test Objective : Verify that only GR helper routers keep the stale
+ route entries, not any GR disabled router.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info("[Phase 1] : Test Setup [Disable]R1-----R2[Restart] initialized ")
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ "preserve-fw-state": True,
+ },
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R2 goes for reload ")
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info(
+ "[Phase 3] : R2 is still down, restart time 120 sec."
+ " So time verify the routes are present in BGP RIB and ZEBRA "
+ )
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(
+ tgen, addr_type, dut, input_topo, next_hop, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ # Verifying RIB routes
+ result = verify_rib(
+ tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ logger.info("[Phase 5] : R2 is about to come up now ")
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ result = verify_r_bit(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_19_p1(request):
+ """
+ Test Objective : Verify that GR helper routers keeps all the routes received
+ from restarting node if both the routers are configured as GR restarting node.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info("[Phase 1] : Test Setup [Helper]R1-----R2[Restart] initialized ")
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ "preserve-fw-state": True,
+ },
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info(
+ "[Phase 2] : R1's Gr state cahnge to Graceful"
+ " Restart without resetting the session "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ logger.info(
+ "[Phase 3] : R2 is still down, restart time 120 sec."
+ " So time verify the routes are present in BGP RIB and ZEBRA "
+ )
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_20_p1(request):
+ """
+ Test Objective : Verify that GR helper routers delete all the routes
+ received from a node if both the routers are configured as GR helper node.
+ """
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info("[Phase 1] : Test Setup [Helper]R1-----R2[Helper] initialized ")
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {
+ "graceful-restart": True,
+ "preserve-fw-state": True,
+ },
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ kill_router_daemons(tgen, "r2", ["bgpd"])
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(
+ tgen, addr_type, dut, input_topo, next_hop, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ # Verifying RIB routes
+ result = verify_rib(
+ tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+ logger.info(" Expected behavior: {}".format(result))
+
+ logger.info("[Phase 5] : R2 is about to come up now ")
+
+ start_router_daemons(tgen, "r2", ["bgpd"])
+
+ logger.info("[Phase 4] : R2 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_31_1_p1(request):
+ """
+ After BGP neighborship is established and GR capability is exchanged,
+ transition restarting router to disabled state and vice versa.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup" " [Helper Mode]R2-----R1[Restart Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r1": {
+ "bgp": {
+ "graceful-restart": {"preserve-fw-state": True},
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ protocol = "bgp"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R1 Goes from Restart to Disable Mode ")
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ clear_bgp(tgen, addr_type, "r1")
+ clear_bgp(tgen, addr_type, "r2")
+
+ # Verify GR stats
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R1 goes for reload ")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info(
+ "[Phase 3] : R1 is still down, restart time 120 sec."
+ " So time verify the routes are not present in BGP RIB and ZEBRA"
+ )
+
+ for addr_type in ADDR_TYPES:
+ # Verifying RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_rib(
+ tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 4] : R1 is about to come up now ")
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 5] : R1 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_GR_TC_31_2_p1(request):
+ """
+ After BGP neighborship is established and GR capability is exchanged,
+ transition restarting router to disabled state and vice versa.
+ """
+
+ tgen = get_topogen()
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Check router status
+ check_router_status(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ logger.info(
+ "[Phase 1] : Test Setup " "[Disable Mode]R1-----R2[Restart Mode] initialized "
+ )
+
+ # Configure graceful-restart
+ input_dict = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart-disable": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ dut = "r1"
+ peer = "r2"
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ protocol = "bgp"
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 2] : R2 Goes from Disable to Restart Mode ")
+
+ # Configure graceful-restart
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "graceful-restart": {"preserve-fw-state": True},
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ },
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ clear_bgp(tgen, addr_type, "r1")
+ clear_bgp(tgen, addr_type, "r2")
+
+ # Verify GR stats
+ input_dict = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {"graceful-restart-helper": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {"graceful-restart": True}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ # here the verify_graceful_restart fro the neighbor would be
+ # "NotReceived" as the latest GR config is not yet applied.
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 3] : R1 goes for reload ")
+
+ kill_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info(
+ "[Phase 4] : R1 is still down, restart time 120 sec."
+ " So time verify the routes are present in BGP RIB and ZEBRA "
+ )
+
+ for addr_type in ADDR_TYPES:
+ # Verifying RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ logger.info("[Phase 6] : R1 is about to come up now ")
+ start_router_daemons(tgen, "r1", ["bgpd"])
+
+ logger.info("[Phase 4] : R1 is UP now, so time to collect GR stats ")
+
+ for addr_type in ADDR_TYPES:
+ result = verify_graceful_restart(
+ tgen, topo, addr_type, input_dict, dut="r1", peer="r2"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying BGP RIB routes
+ next_hop = next_hop_per_address_family(
+ tgen, dut, peer, addr_type, NEXT_HOP_IP_2
+ )
+ input_topo = {key: topo["routers"][key] for key in ["r2"]}
+ result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ # Verifying RIB routes
+ result = verify_rib(tgen, addr_type, dut, input_topo, next_hop, protocol)
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 5fb07b7377..2dd90e9a86 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -23,6 +23,7 @@ from time import sleep
import traceback
import ipaddr
import os
+import sys
from lib import topotest
from lib.topolog import logger
@@ -35,13 +36,15 @@ from lib.common_config import (
generate_ips,
validate_ip_address,
find_interface_with_greater_ip,
- run_frr_cmd, FRRCFG_FILE,
+ run_frr_cmd,
+ FRRCFG_FILE,
retry,
)
LOGDIR = "/tmp/topotests/"
TMPDIR = None
+
def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True):
"""
API to configure bgp on router
@@ -64,9 +67,9 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
"graceful-restart": True,
"preserve-fw-state": True,
"timer": {
- "restart-time": 300,
- "rib-stale-time": 300,
- "select-defer-time": 300,
+ "restart-time": 30,
+ "rib-stale-time": 30,
+ "select-defer-time": 30,
}
},
"address_family": {
@@ -93,21 +96,21 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
"dest_link": {
"r4": {
"allowas-in": {
- "number_occurences":2
+ "number_occurences": 2
},
- "graceful-restart": True",
"prefix_lists": [
{
"name": "pf_list_1",
"direction": "in"
}
],
- "route_maps": [
- {"name": "RMAP_MED_R3",
- "direction": "in"}
- ],
+ "route_maps": [{
+ "name": "RMAP_MED_R3",
+ "direction": "in"
+ }],
"next_hop_self": True
- }
+ },
+ "r1": {"graceful-restart-helper": True}
}
}
}
@@ -123,7 +126,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
-------
True or False
"""
- logger.debug("Entering lib API: create_router_bgp()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
# Flag is used when testing ipv6 over ipv4 or vice-versa
@@ -165,8 +168,12 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
- tgen, topo, input_dict, router, afi_test,
- config_data=data_all_bgp
+ tgen,
+ topo,
+ input_dict,
+ router,
+ afi_test,
+ config_data=data_all_bgp,
)
try:
@@ -179,7 +186,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_router_bgp()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@@ -199,7 +206,7 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
True or False
"""
- logger.debug("Entering lib API: __create_bgp_global()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
bgp_data = input_dict[router]["bgp"]
del_bgp_action = bgp_data.setdefault("delete", False)
@@ -223,8 +230,6 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
cmd = "{} vrf {}".format(cmd, vrf_id)
config_data.append(cmd)
-
- # Skip RFC8212 in topotests
config_data.append("no bgp ebgp-requires-policy")
router_id = bgp_data.setdefault("router_id", None)
@@ -300,12 +305,13 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
config_data.append(cmd)
- logger.debug("Exiting lib API: create_bgp_global()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return config_data
-def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test,
- config_data=None):
+def __create_bgp_unicast_neighbor(
+ tgen, topo, input_dict, router, afi_test, config_data=None
+):
"""
Helper API to create configuration for address-family unicast
@@ -319,7 +325,7 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test,
* `build` : Only for initial setup phase this is set as True.
"""
- logger.debug("Entering lib API: __create_bgp_unicast_neighbor()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
add_neigh = True
if "router bgp" in config_data:
@@ -428,7 +434,7 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, afi_test,
config_data.extend(neigh_addr_data)
- logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return config_data
@@ -445,7 +451,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
"""
config_data = []
- logger.debug("Entering lib API: __create_bgp_neighbor()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
bgp_data = input_dict[router]["bgp"]["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
@@ -520,7 +526,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
)
config_data.append("{} enforce-multihop".format(neigh_cxt))
- logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return config_data
@@ -541,7 +547,7 @@ def __create_bgp_unicast_address_family(
"""
config_data = []
- logger.debug("Entering lib API: __create_bgp_unicast_neighbor()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
bgp_data = input_dict[router]["bgp"]["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
@@ -573,26 +579,23 @@ def __create_bgp_unicast_address_family(
]
neigh_cxt = "neighbor {}".format(ip_addr)
- config_data.append("address-family {} unicast".format(
- addr_type
- ))
+ config_data.append("address-family {} unicast".format(addr_type))
if activate_addr_family is not None:
- config_data.append("address-family {} unicast".format(
- activate_addr_family))
-
config_data.append(
- "{} activate".format(neigh_cxt))
+ "address-family {} unicast".format(activate_addr_family)
+ )
+
+ config_data.append("{} activate".format(neigh_cxt))
if deactivate and activate_addr_family is None:
- config_data.append(
- "no neighbor {} activate".format(deactivate))
+ config_data.append("no neighbor {} activate".format(deactivate))
if deactivate_addr_family is not None:
- config_data.append("address-family {} unicast".format(
- deactivate_addr_family))
config_data.append(
- "no {} activate".format(neigh_cxt))
+ "address-family {} unicast".format(deactivate_addr_family)
+ )
+ config_data.append("no {} activate".format(neigh_cxt))
next_hop_self = peer.setdefault("next_hop_self", None)
send_community = peer.setdefault("send_community", None)
@@ -722,7 +725,7 @@ def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
"""
- logger.debug("Entering lib API: modify_bgp_config_when_bgpd_down()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
global LOGDIR
@@ -759,7 +762,7 @@ def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: modify_bgp_config_when_bgpd_down")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -801,7 +804,7 @@ def verify_router_id(tgen, topo, input_dict):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_router_id()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
if router not in tgen.routers():
continue
@@ -832,7 +835,7 @@ def verify_router_id(tgen, topo, input_dict):
)
return errormsg
- logger.debug("Exiting lib API: verify_router_id()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -857,7 +860,7 @@ def verify_bgp_convergence(tgen, topo):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_bgp_convergence()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if "bgp" not in topo["routers"][router]:
continue
@@ -939,7 +942,7 @@ def modify_as_number(tgen, topo, input_dict):
errormsg(str) or True
"""
- logger.debug("Entering lib API: modify_as_number()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
new_topo = deepcopy(topo["routers"])
@@ -966,8 +969,7 @@ def modify_as_number(tgen, topo, input_dict):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: modify_as_number()")
-
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1001,7 +1003,7 @@ def verify_as_numbers(tgen, topo, input_dict):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_as_numbers()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
if router not in tgen.routers():
continue
@@ -1066,10 +1068,47 @@ def verify_as_numbers(tgen, topo, input_dict):
remote_as,
)
- logger.debug("Exiting lib API: verify_AS_numbers()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+def clear_bgp(tgen, addr_type, router, vrf=None):
+ """
+ This API is to clear bgp neighborship by running
+ clear ip bgp */clear bgp ipv6 * command,
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `addr_type`: ip type ipv4/ipv6
+ * `router`: device under test
+ * `vrf`: vrf name
+
+ Usage
+ -----
+ clear_bgp(tgen, addr_type, "r1")
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ if vrf:
+ if type(vrf) is not list:
+ vrf = [vrf]
+
+ # Clearing BGP
+ logger.info("Clearing BGP neighborship for router %s..", router)
+ if addr_type == "ipv4":
+ run_frr_cmd(rnode, "clear ip bgp *")
+ elif addr_type == "ipv6":
+ run_frr_cmd(rnode, "clear bgp ipv6 *")
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+
def clear_bgp_and_verify(tgen, topo, router):
"""
This API is to clear bgp neighborship and verify bgp neighborship
@@ -1093,7 +1132,7 @@ def clear_bgp_and_verify(tgen, topo, router):
errormsg(str) or True
"""
- logger.debug("Entering lib API: clear_bgp_and_verify()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if router not in tgen.routers():
return False
@@ -1264,7 +1303,7 @@ def clear_bgp_and_verify(tgen, topo, router):
)
return errormsg
- logger.debug("Exiting lib API: clear_bgp_and_verify()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1300,7 +1339,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_bgp_timers_and_functionality()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
sleep(5)
router_list = tgen.routers()
for router in input_dict.keys():
@@ -1507,7 +1546,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
bgp_neighbor,
)
- logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1571,7 +1610,7 @@ def verify_bgp_attributes(
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_bgp_attributes()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
continue
@@ -1646,7 +1685,7 @@ def verify_bgp_attributes(
)
return errormsg
- logger.debug("Exiting lib API: verify_bgp_attributes()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1701,7 +1740,7 @@ def verify_best_path_as_per_bgp_attribute(
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if router not in tgen.routers():
return False
@@ -1811,7 +1850,7 @@ def verify_best_path_as_per_bgp_attribute(
router,
)
- logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1850,7 +1889,7 @@ def verify_best_path_as_per_admin_distance(
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_best_path_as_per_admin_distance()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
router_list = tgen.routers()
if router not in router_list:
return False
@@ -1922,11 +1961,11 @@ def verify_best_path_as_per_admin_distance(
router,
)
- logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
-@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+@retry(attempts=10, wait=2, return_is_str=True, initial_wait=2)
def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
"""
This API is to verify whether bgp rib has any
@@ -1956,7 +1995,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_bgp_rib()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
@@ -2171,7 +2210,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
"routes are: {}\n".format(dut, found_routes)
)
- logger.debug("Exiting lib API: verify_bgp_rib()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2234,7 +2273,7 @@ def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_graceful_restart()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
@@ -2273,8 +2312,6 @@ def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer):
isjson=True,
)
- logger.info("show_bgp_graceful_json {}".format(show_bgp_graceful_json))
-
show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
@@ -2422,7 +2459,7 @@ def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer):
)
return errormsg
- logger.debug("Exiting lib API: verify_graceful_restart()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2484,7 +2521,7 @@ def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_r_bit()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
@@ -2540,11 +2577,11 @@ def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer):
errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
return errormsg
- logger.debug("Exiting lib API: verify_r_bit()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
-@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
def verify_eor(tgen, topo, addr_type, input_dict, dut, peer):
"""
This API is to verify EOR
@@ -2602,7 +2639,7 @@ def verify_eor(tgen, topo, addr_type, input_dict, dut, peer):
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_eor()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
@@ -2651,58 +2688,59 @@ def verify_eor(tgen, topo, addr_type, input_dict, dut, peer):
)
return errormsg
- for afi, afi_data in show_bgp_graceful_json_out.items():
- if "v4" not in afi and "v6" not in afi:
- continue
+ if addr_type == "ipv4":
+ afi = "ipv4Unicast"
+ elif addr_type == "ipv6":
+ afi = "ipv6Unicast"
+ else:
+ errormsg = "Address type %s is not supported" % (addr_type)
+ return errormsg
- eor_json = afi_data["endOfRibStatus"]
- if "endOfRibSend" in eor_json:
+ eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
+ if "endOfRibSend" in eor_json:
- if eor_json["endOfRibSend"]:
- logger.info(
- "[DUT: %s]: EOR Send true for %s " "%s",
- dut,
- neighbor_ip,
- afi,
- )
- else:
- errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
- dut,
- neighbor_ip,
- afi,
- )
- return errormsg
+ if eor_json["endOfRibSend"]:
+ logger.info(
+ "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
+ )
+ else:
+ errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
+ dut,
+ neighbor_ip,
+ afi,
+ )
+ return errormsg
- if "endOfRibRecv" in eor_json:
- if eor_json["endOfRibRecv"]:
- logger.info(
- "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
- )
- else:
- errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
- dut,
- neighbor_ip,
- afi,
- )
- return errormsg
+ if "endOfRibRecv" in eor_json:
+ if eor_json["endOfRibRecv"]:
+ logger.info(
+ "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
+ )
+ else:
+ errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
+ dut,
+ neighbor_ip,
+ afi,
+ )
+ return errormsg
- if "endOfRibSentAfterUpdate" in eor_json:
- if eor_json["endOfRibSentAfterUpdate"]:
- logger.info(
- "[DUT: %s]: EOR SendTime true for %s" " %s",
- dut,
- neighbor_ip,
- afi,
- )
- else:
- errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
- dut,
- neighbor_ip,
- afi,
- )
- return errormsg
+ if "endOfRibSentAfterUpdate" in eor_json:
+ if eor_json["endOfRibSentAfterUpdate"]:
+ logger.info(
+ "[DUT: %s]: EOR SendTime true for %s" " %s",
+ dut,
+ neighbor_ip,
+ afi,
+ )
+ else:
+ errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
+ dut,
+ neighbor_ip,
+ afi,
+ )
+ return errormsg
- logger.debug("Exiting lib API: verify_eor()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2766,7 +2804,7 @@ def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_f_bit()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
@@ -2815,21 +2853,34 @@ def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer):
)
return errormsg
- for afi, afi_data in show_bgp_graceful_json_out.items():
- if "v4" not in afi and "v6" not in afi:
- continue
+ if "ipv4Unicast" in show_bgp_graceful_json_out:
+ if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]:
+ logger.info(
+ "[DUT: {}]: Fbit True for {} IPv4"
+ " Unicast".format(dut, neighbor_ip)
+ )
+ else:
+ errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
+ dut, neighbor_ip
+ )
+ return errormsg
- if afi_data["fBit"]:
+ elif "ipv6Unicast" in show_bgp_graceful_json_out:
+ if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]:
logger.info(
- "[DUT: {}]: Fbit True for {} {}".format(dut, neighbor_ip, afi)
+ "[DUT: {}]: Fbit True for {} IPv6"
+ " Unicast".format(dut, neighbor_ip)
)
else:
- errormsg = "[DUT: {}]: Fbit False for {} {}".format(
- dut, neighbor_ip, afi
+ errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
+ dut, neighbor_ip
)
return errormsg
+ else:
+ show_bgp_graceful_json_out["ipv4Unicast"]
+ show_bgp_graceful_json_out["ipv6Unicast"]
- logger.debug("Exiting lib API: verify_f_bit()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2879,7 +2930,7 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer)
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_graceful_restart_timers()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
@@ -2955,7 +3006,7 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer)
)
return errormsg
- logger.debug("Exiting lib API: verify_graceful_restart_timers")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2970,20 +3021,20 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut):
* `tgen`: topogen object
* `topo`: input json file data
* `addr_type` : ip type ipv4/ipv6
- * `addr_family` : address family type IPV4 Unicast/IPV6 Unicast
+ * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
* `dut`: input dut router name
Usage
-----
- result = verify_gr_address_family(tgen, topo, addr_type, addr_family,
- , dut)
+
+ result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1")
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_gr_address_family()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router, rnode in tgen.routers().iteritems():
if router != dut:
@@ -3025,19 +3076,25 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut):
errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
return errormsg
- if "v4" in addr_family:
- input_afi = "v4"
- elif "v6" in addr_family:
- input_afi = "v6"
-
- for afi in show_bgp_graceful_json_out.keys():
- if input_afi not in afi:
- continue
+ if addr_family == "ipv4Unicast":
+ if "ipv4Unicast" in show_bgp_graceful_json_out:
+ logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
+ return True
else:
- logger.info("{} present for {} ".format(addr_family, neighbor_ip))
+ errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
+ return errormsg
+
+ elif addr_family == "ipv6Unicast":
+ if "ipv6Unicast" in show_bgp_graceful_json_out:
+ logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
return True
+ else:
+ errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
+ return errormsg
else:
- errormsg = "{} NOT present for {} ".format(addr_family, neighbor_ip)
+ errormsg = "Aaddress family: {} present for {} ".format(
+ addr_family, neighbor_ip
+ )
return errormsg
- logger.debug("Exiting lib API: verify_gr_address_family()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 72ee28c1c6..0b19877aff 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -32,6 +32,7 @@ from tempfile import mkdtemp
import StringIO
import os
+import sys
import ConfigParser
import traceback
import socket
@@ -100,13 +101,11 @@ SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
def get_seq_id(obj_type, router, obj_name):
"""
Generates and saves sequence number in interval of 10
-
Parameters
----------
* `obj_type`: prefix_lists or route_maps
* `router`: router name
*` obj_name`: name of the prefix-list or route-map
-
Returns
--------
Sequence number generated
@@ -125,7 +124,6 @@ def get_seq_id(obj_type, router, obj_name):
def set_seq_id(obj_type, router, id, obj_name):
"""
Saves sequence number if not auto-generated and given by user
-
Parameters
----------
* `obj_type`: prefix_lists or route_maps
@@ -149,11 +147,9 @@ class InvalidCLIError(Exception):
def run_frr_cmd(rnode, cmd, isjson=False):
"""
Execute frr show commands in priviledged mode
-
* `rnode`: router node on which commands needs to executed
* `cmd`: Command to be executed on frr
* `isjson`: If command is to get json data or not
-
:return str:
"""
@@ -179,12 +175,13 @@ def run_frr_cmd(rnode, cmd, isjson=False):
raise InvalidCLIError("No actual cmd passed")
-def create_common_configuration(tgen, router, data, config_type=None, build=False, load_config=True):
+def create_common_configuration(
+ tgen, router, data, config_type=None, build=False, load_config=True
+):
"""
API to create object of class FRRConfig and also create frr_json.conf
file. It will create interface and common configurations and save it to
frr_json.conf and load to router
-
Parameters
----------
* `tgen`: tgen onject
@@ -193,7 +190,6 @@ def create_common_configuration(tgen, router, data, config_type=None, build=Fals
* `config_type` : Syntactic information while writing configuration. Should
be one of the value as mentioned in the config_map below.
* `build` : Only for initial setup phase this is set as True
-
Returns
-------
True or False
@@ -248,13 +244,12 @@ def kill_router_daemons(tgen, router, daemons):
"""
Router's current config would be saved to /etc/frr/ for each deamon
and deamon would be killed forcefully using SIGKILL.
-
* `tgen` : topogen object
* `router`: Device under test
* `daemons`: list of daemons to be killed
"""
- logger.debug("Entering lib API: kill_router_daemons()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
router_list = tgen.routers()
@@ -278,20 +273,18 @@ def kill_router_daemons(tgen, router, daemons):
def start_router_daemons(tgen, router, daemons):
"""
Daemons defined by user would be started
-
* `tgen` : topogen object
* `router`: Device under test
* `daemons`: list of daemons to be killed
"""
- logger.debug("Entering lib API: start_router_daemons()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
router_list = tgen.routers()
# Start daemons
result = router_list[router].startDaemons(daemons)
- sleep(5)
return result
except Exception as e:
@@ -299,14 +292,13 @@ def start_router_daemons(tgen, router, daemons):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: start_router_daemons()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
def kill_mininet_routers_process(tgen):
"""
Kill all mininet stale router' processes
-
* `tgen` : topogen object
"""
@@ -331,11 +323,10 @@ def kill_mininet_routers_process(tgen):
def check_router_status(tgen):
"""
Check if all daemons are running for all routers in topology
-
* `tgen` : topogen object
"""
- logger.debug("Entering lib API: start_router_daemons()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
router_list = tgen.routers()
@@ -356,7 +347,7 @@ def check_router_status(tgen):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: start_router_daemons()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -364,7 +355,6 @@ def reset_config_on_routers(tgen, routerName=None):
"""
Resets configuration on routers to the snapshot created using input JSON
file. It replaces existing router configuration with FRRCFG_BKUP_FILE
-
Parameters
----------
* `tgen` : Topogen object
@@ -480,7 +470,6 @@ def reset_config_on_routers(tgen, routerName=None):
def load_config_to_router(tgen, routerName, save_bkup=False):
"""
Loads configuration on router from the file FRRCFG_FILE.
-
Parameters
----------
* `tgen` : Topogen object
@@ -536,16 +525,13 @@ def get_frr_ipv6_linklocal(tgen, router, intf=None):
"""
API to get the link local ipv6 address of a perticular interface using
FRR command 'show interface'
-
* `tgen`: tgen onject
* `router` : router for which hightest interface should be
calculated
* `intf` : interface for which linklocal address needs to be taken
-
Usage
-----
linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
-
Returns
-------
1) array of interface names to link local ips.
@@ -675,11 +661,9 @@ def number_to_column(routerName):
def validate_ip_address(ip_address):
"""
Validates the type of ip address
-
Parameters
----------
* `ip_address`: IPv4/IPv6 address
-
Returns
-------
Type of address as string
@@ -746,7 +730,6 @@ def generate_ips(network, no_of_ips):
"""
Returns list of IPs.
based on start_ip and no_of_ips
-
* `network` : from here the ip will start generating,
start_ip will be
* `no_of_ips` : these many IPs will be generated
@@ -787,7 +770,6 @@ def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
Returns highest interface ip for ipv4/ipv6. If loopback is there then
it will return highest IP from loopback IPs otherwise from physical
interface IPs.
-
* `topo` : json file data
* `router` : router for which hightest interface should be calculated
"""
@@ -835,11 +817,9 @@ def write_test_footer(tc_name):
def interface_status(tgen, topo, input_dict):
"""
Delete ip route maps from device
-
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : for which router, route map has to be deleted
-
Usage
-----
input_dict = {
@@ -852,7 +832,7 @@ def interface_status(tgen, topo, input_dict):
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: interface_status()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
global frr_cfg
@@ -876,19 +856,17 @@ def interface_status(tgen, topo, input_dict):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: interface_status()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
"""
Retries function execution, if return is an errormsg or exception
-
* `attempts`: Number of attempts to make
* `wait`: Number of seconds to wait between each attempt
* `return_is_str`: Return val is an errormsg in case of failure
* `initial_wait`: Sleeps for this much seconds before executing function
-
"""
def _retry(func):
@@ -967,13 +945,11 @@ def create_interfaces_cfg(tgen, topo, build=False):
"""
Create interface configuration for created topology. Basic Interface
configuration is provided in input json file.
-
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `build` : Only for initial setup phase this is set as True.
-
Returns
-------
True or False
@@ -1012,13 +988,11 @@ def create_interfaces_cfg(tgen, topo, build=False):
def create_static_routes(tgen, input_dict, build=False):
"""
Create static routes for given router as defined in input_dict
-
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
-
Usage
-----
input_dict should be in the format below:
@@ -1029,7 +1003,6 @@ def create_static_routes(tgen, input_dict, build=False):
# next_hop: starting next-hop address
# tag: tag id for static routes
# delete: True if config to be removed. Default False.
-
Example:
"routers": {
"r1": {
@@ -1045,13 +1018,12 @@ def create_static_routes(tgen, input_dict, build=False):
]
}
}
-
Returns
-------
errormsg(str) or True
"""
result = False
- logger.debug("Entering lib API: create_static_routes()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
@@ -1108,7 +1080,7 @@ def create_static_routes(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_static_routes()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@@ -1116,13 +1088,11 @@ def create_prefix_lists(tgen, input_dict, build=False):
"""
Create ip prefix lists as per the config provided in input
JSON or input_dict
-
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
-
Usage
-----
# pf_lists_1: name of prefix-list, user defined
@@ -1131,7 +1101,6 @@ def create_prefix_lists(tgen, input_dict, build=False):
# action: permit/deny
# le: less than or equal number of bits
# ge: greater than or equal number of bits
-
Example
-------
input_dict = {
@@ -1152,13 +1121,12 @@ def create_prefix_lists(tgen, input_dict, build=False):
}
}
}
-
Returns
-------
errormsg or True
"""
- logger.debug("Entering lib API: create_prefix_lists()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
try:
for router in input_dict.keys():
@@ -1217,20 +1185,18 @@ def create_prefix_lists(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_prefix_lists()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def create_route_maps(tgen, input_dict, build=False):
"""
Create route-map on the devices as per the arguments passed
-
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
-
Usage
-----
# route_maps: key, value pair for route-map name and its attribute
@@ -1251,7 +1217,6 @@ def create_route_maps(tgen, input_dict, build=False):
# large_community: large community value to be attached
# community_additive: if set to "additive", adds community/large-community
value to the existing values of the network prefix
-
Example:
--------
input_dict = {
@@ -1267,7 +1232,6 @@ def create_route_maps(tgen, input_dict, build=False):
"ipv6": {
"prefix_list": "pf_list_1"
}
-
"large-community-list": {
"id": "community_1",
"exact_match": True
@@ -1300,14 +1264,13 @@ def create_route_maps(tgen, input_dict, build=False):
}
}
}
-
Returns
-------
errormsg(str) or True
"""
result = False
- logger.debug("Entering lib API: create_route_maps()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
@@ -1572,18 +1535,16 @@ def create_route_maps(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_route_maps()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def delete_route_maps(tgen, input_dict):
"""
Delete ip route maps from device
-
* `tgen` : Topogen object
* `input_dict` : for which router,
route map has to be deleted
-
Usage
-----
# Delete route-map rmap_1 and rmap_2 from router r1
@@ -1593,12 +1554,11 @@ def delete_route_maps(tgen, input_dict):
}
}
result = delete_route_maps("ipv4", input_dict)
-
Returns
-------
errormsg(str) or True
"""
- logger.info("Entering lib API: delete_route_maps()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
route_maps = input_dict[router]["route_maps"][:]
@@ -1614,7 +1574,6 @@ def create_bgp_community_lists(tgen, input_dict, build=False):
"""
Create bgp community-list or large-community-list on the devices as per
the arguments passed. Takes list of communities in input.
-
Parameters
----------
* `tgen` : Topogen object
@@ -1640,7 +1599,7 @@ def create_bgp_community_lists(tgen, input_dict, build=False):
"""
result = False
- logger.debug("Entering lib API: create_bgp_community_lists()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
input_dict = deepcopy(input_dict)
try:
for router in input_dict.keys():
@@ -1696,30 +1655,26 @@ def create_bgp_community_lists(tgen, input_dict, build=False):
logger.error(errormsg)
return errormsg
- logger.debug("Exiting lib API: create_bgp_community_lists()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
"""
Shutdown or bringup router's interface "
-
* `tgen` : Topogen object
* `dut` : Device under test
* `intf_name` : Interface name to be shut/no shut
* `ifaceaction` : Action, to shut/no shut interface,
by default is False
-
Usage
-----
dut = "r3"
intf = "r3-r1-eth0"
# Shut down ineterface
shutdown_bringup_interface(tgen, dut, intf, False)
-
# Bring up ineterface
shutdown_bringup_interface(tgen, dut, intf, True)
-
Returns
-------
errormsg(str) or True
@@ -1745,7 +1700,6 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
advertise_networks_using_network_command() and do will verify next_hop and
each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
command o/p.
-
Parameters
----------
* `tgen` : topogen object
@@ -1755,14 +1709,12 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
* `next_hop`[optional]: next_hop which needs to be verified,
default: static
* `protocol`[optional]: protocol, default = None
-
Usage
-----
# RIB can be verified for static routes OR network advertised using
network command. Following are input_dicts to create static routes
and advertise networks using network command. Any one of the input_dict
can be passed to verify_rib() to verify routes in DUT"s RIB.
-
# Creating static routes for r1
input_dict = {
"r1": {
@@ -1780,13 +1732,12 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
dut = "r2"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
-
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_rib()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
router_list = tgen.routers()
for routerInput in input_dict.keys():
@@ -1820,10 +1771,9 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
static_routes = input_dict[routerInput]["static_routes"]
st_found = False
nh_found = False
+ found_routes = []
+ missing_routes = []
for static_route in static_routes:
- found_routes = []
- missing_routes = []
-
network = static_route["network"]
if "no_of_ip" in static_route:
no_of_ip = static_route["no_of_ip"]
@@ -1923,7 +1873,9 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
next_hop = [next_hop]
for nh in next_hop:
- for nh_json in rib_routes_json[st_rt][0]["nexthops"]:
+ for nh_json in rib_routes_json[st_rt][0][
+ "nexthops"
+ ]:
if nh != nh_json["ip"]:
continue
nh_found = True
@@ -1959,7 +1911,7 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
" routes are: {}\n".format(addr_type, dut, found_routes)
)
- logger.debug("Exiting lib API: verify_rib()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -1967,7 +1919,6 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
"""
API to verify admin distance for static routes as defined in input_dict/
input JSON by running show ip/ipv6 route json command.
-
Parameter
---------
* `tgen` : topogen object
@@ -1987,13 +1938,12 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
}
}
result = verify_admin_distance_for_static_routes(tgen, input_dict)
-
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_admin_distance_for_static_routes()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
if router not in tgen.routers():
@@ -2050,7 +2000,7 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
)
return errormsg
- logger.debug("Exiting lib API: verify_admin_distance_for_static_routes()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2058,12 +2008,10 @@ def verify_prefix_lists(tgen, input_dict):
"""
Running "show ip prefix-list" command and verifying given prefix-list
is present in router.
-
Parameters
----------
* `tgen` : topogen object
* `input_dict`: data to verify prefix lists
-
Usage
-----
# To verify pf_list_1 is present in router r1
@@ -2072,13 +2020,12 @@ def verify_prefix_lists(tgen, input_dict):
"prefix_lists": ["pf_list_1"]
}}
result = verify_prefix_lists("ipv4", input_dict, tgen)
-
Returns
-------
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_prefix_lists()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
if router not in tgen.routers():
@@ -2109,7 +2056,7 @@ def verify_prefix_lists(tgen, input_dict):
router,
)
- logger.debug("Exiting lib API: verify_prefix_lists()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2136,7 +2083,7 @@ def verify_route_maps(tgen, input_dict):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_route_maps()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
if router not in tgen.routers():
@@ -2161,7 +2108,7 @@ def verify_route_maps(tgen, input_dict):
router,
)
- logger.debug("Exiting lib API: verify_route_maps()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2170,7 +2117,6 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
"""
API to veiryf BGP large community is attached in route for any given
DUT by running "show bgp ipv4/6 {route address} json" command.
-
Parameters
----------
* `tgen`: topogen object
@@ -2186,13 +2132,12 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
"largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
}
result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
-
Returns
-------
errormsg(str) or True
"""
- logger.info("Entering lib API: verify_bgp_community()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if router not in tgen.routers():
return False
@@ -2254,7 +2199,7 @@ def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
)
return errormsg
- logger.debug("Exiting lib API: verify_bgp_community()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@@ -2283,7 +2228,7 @@ def verify_create_community_list(tgen, input_dict):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_create_community_list()")
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for router in input_dict.keys():
if router not in tgen.routers():
@@ -2311,5 +2256,5 @@ def verify_create_community_list(tgen, input_dict):
)
return errormsg
- logger.debug("Exiting lib API: verify_create_community_list()")
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True