This function groups the repetitive function calls into one function.
"""
+ logger.info("configure_gr_followed_by_clear: dut %s peer %s", dut, peer)
+
result = create_router_bgp(tgen, topo, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
# Creating configuration from JSON
reset_config_on_routers(tgen)
- logger.info(
- "[Step 1] : Test Setup " "[Helper Mode]R3-----R1[Restart Mode] initialized"
- )
+ step("Test Setup: [Helper Mode]R3-----R1[Restart Mode] initialized")
# Configure graceful-restart
input_dict = {
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r3")
for addr_type in ADDR_TYPES:
+ step("Verifying GR config and operational state for addr_type {}".format(addr_type))
+
result = verify_graceful_restart(
tgen, topo, addr_type, input_dict, dut="r1", peer="r3"
)
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv4Unicast", dut="r1"
+ tgen, topo, addr_type, "ipv4Unicast", dut="r1", peer="r3",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv6Unicast", dut="r1"
+ tgen, topo, addr_type, "ipv6Unicast", dut="r1", peer="r3",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv4Unicast", dut="r3"
+ tgen, topo, addr_type, "ipv4Unicast", dut="r3", peer="r1",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv6Unicast", dut="r3"
+ tgen, topo, addr_type, "ipv6Unicast", dut="r3", peer="r1",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
+ step("Killing bgpd on r1")
+
# Kill BGPd daemon on R1
kill_router_daemons(tgen, "r1", ["bgpd"])
tc_name, result
)
+ step("Starting bgpd on r1")
+
# Start BGPd daemon on R1
start_router_daemons(tgen, "r1", ["bgpd"])
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv4Unicast", dut="r1"
+ tgen, topo, addr_type, "ipv4Unicast", dut="r1", peer="r3",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv6Unicast", dut="r1"
+ tgen, topo, addr_type, "ipv6Unicast", dut="r1", peer="r3",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv4Unicast", dut="r3"
+ tgen, topo, addr_type, "ipv4Unicast", dut="r3", peer="r1",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
# verify multi address family
result = verify_gr_address_family(
- tgen, topo, addr_type, "ipv6Unicast", dut="r3"
+ tgen, topo, addr_type, "ipv6Unicast", dut="r3", peer="r1",
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
@retry(retry_timeout=8)
-def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, expected=True):
+def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, peer, expected=True):
"""
This API is to verify gr_address_family in the BGP gr capability advertised
by the neighbor router
* `addr_type` : ip type ipv4/ipv6
* `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
* `dut`: input dut router name
+ * `peer`: input peer router to check
* `expected` : expected results from API, by-default True
Usage
-----
- result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1")
+ result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
Returns
-------
- errormsg(str) or True
+ errormsg(str) or None
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- for router, rnode in tgen.routers().items():
- if router != dut:
- continue
+ if not check_address_types(addr_type):
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return
- bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ routers = tgen.routers()
+ if dut not in routers:
+ return "{} not in routers".format(dut)
- if addr_type in bgp_addr_type:
- if not check_address_types(addr_type):
- continue
+ rnode = routers[dut]
+ bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ if addr_type not in bgp_addr_type:
+ return "{} not in bgp_addr_types".format(addr_type)
- for bgp_neighbor, peer_data in bgp_neighbors.items():
- for dest_link, peer_dict in peer_data["dest_link"].items():
- data = topo["routers"][bgp_neighbor]["links"]
+ if peer not in bgp_addr_type[addr_type]["unicast"]["neighbor"]:
+ return "{} not a peer of {} over {}".format(peer, dut, addr_type)
- if dest_link in data:
- neighbor_ip = data[dest_link][addr_type].split("/")[0]
+ nbr_links = topo["routers"][peer]["links"]
+ if dut not in nbr_links or addr_type not in nbr_links[dut]:
+ return "peer {} missing back link to {} over {}".format(peer, dut, addr_type)
- logger.info(
- "[DUT: {}]: Checking bgp graceful-restart"
- " show o/p {}".format(dut, neighbor_ip)
- )
+ neighbor_ip = nbr_links[dut][addr_type].split("/")[0]
- show_bgp_graceful_json = run_frr_cmd(
- rnode,
- "show bgp {} neighbor {} graceful-restart json".format(
- addr_type, neighbor_ip
- ),
- isjson=True,
- )
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
+ dut, neighbor_ip, addr_family
+ )
+ )
- show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
- if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
- logger.info("Neighbor ip matched {}".format(neighbor_ip))
- else:
- errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
- return errormsg
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
- if addr_family == "ipv4Unicast":
- if "ipv4Unicast" in show_bgp_graceful_json_out:
- logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
- return True
- else:
- errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
- return errormsg
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info("Neighbor ip matched {}".format(neighbor_ip))
+ else:
+ errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
+ return errormsg
- elif addr_family == "ipv6Unicast":
- if "ipv6Unicast" in show_bgp_graceful_json_out:
- logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
- return True
- else:
- errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
- return errormsg
- else:
- errormsg = "Aaddress family: {} present for {} ".format(
- addr_family, neighbor_ip
- )
- return errormsg
+ if addr_family == "ipv4Unicast":
+ if "ipv4Unicast" in show_bgp_graceful_json_out:
+ logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
+ return True
+ else:
+ errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
+ return errormsg
+
+ elif addr_family == "ipv6Unicast":
+ if "ipv6Unicast" in show_bgp_graceful_json_out:
+ logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
+ return True
+ else:
+ errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
+ return errormsg
+ else:
+ errormsg = "Aaddress family: {} present for {} ".format(
+ addr_family, neighbor_ip
+ )
+ return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))