summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py110
1 files changed, 58 insertions, 52 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 2f1f67439f..922dee1291 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -3765,7 +3765,7 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer)
@retry(retry_timeout=8)
-def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, expected=True):
+def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, peer, expected=True):
"""
This API is to verify gr_address_family in the BGP gr capability advertised
by the neighbor router
@@ -3777,80 +3777,86 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut, expected=T
* `addr_type` : ip type ipv4/ipv6
* `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
* `dut`: input dut router name
+ * `peer`: input peer router to check
* `expected` : expected results from API, by-default True
Usage
-----
- result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1")
+ result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
Returns
-------
- errormsg(str) or True
+ errormsg(str) or None
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
- for router, rnode in tgen.routers().items():
- if router != dut:
- continue
+ if not check_address_types(addr_type):
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return
- bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ routers = tgen.routers()
+ if dut not in routers:
+ return "{} not in routers".format(dut)
- if addr_type in bgp_addr_type:
- if not check_address_types(addr_type):
- continue
+ rnode = routers[dut]
+ bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
- bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ if addr_type not in bgp_addr_type:
+ return "{} not in bgp_addr_types".format(addr_type)
- for bgp_neighbor, peer_data in bgp_neighbors.items():
- for dest_link, peer_dict in peer_data["dest_link"].items():
- data = topo["routers"][bgp_neighbor]["links"]
+ if peer not in bgp_addr_type[addr_type]["unicast"]["neighbor"]:
+ return "{} not a peer of {} over {}".format(peer, dut, addr_type)
- if dest_link in data:
- neighbor_ip = data[dest_link][addr_type].split("/")[0]
+ nbr_links = topo["routers"][peer]["links"]
+ if dut not in nbr_links or addr_type not in nbr_links[dut]:
+ return "peer {} missing back link to {} over {}".format(peer, dut, addr_type)
- logger.info(
- "[DUT: {}]: Checking bgp graceful-restart"
- " show o/p {}".format(dut, neighbor_ip)
- )
+ neighbor_ip = nbr_links[dut][addr_type].split("/")[0]
- show_bgp_graceful_json = run_frr_cmd(
- rnode,
- "show bgp {} neighbor {} graceful-restart json".format(
- addr_type, neighbor_ip
- ),
- isjson=True,
- )
+ logger.info(
+ "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
+ dut, neighbor_ip, addr_family
+ )
+ )
- show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
+ show_bgp_graceful_json = run_frr_cmd(
+ rnode,
+ "show bgp {} neighbor {} graceful-restart json".format(
+ addr_type, neighbor_ip
+ ),
+ isjson=True,
+ )
- if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
- logger.info("Neighbor ip matched {}".format(neighbor_ip))
- else:
- errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
- return errormsg
+ show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
- if addr_family == "ipv4Unicast":
- if "ipv4Unicast" in show_bgp_graceful_json_out:
- logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
- return True
- else:
- errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
- return errormsg
+ if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
+ logger.info("Neighbor ip matched {}".format(neighbor_ip))
+ else:
+ errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
+ return errormsg
- elif addr_family == "ipv6Unicast":
- if "ipv6Unicast" in show_bgp_graceful_json_out:
- logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
- return True
- else:
- errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
- return errormsg
- else:
- errormsg = "Aaddress family: {} present for {} ".format(
- addr_family, neighbor_ip
- )
- return errormsg
+ if addr_family == "ipv4Unicast":
+ if "ipv4Unicast" in show_bgp_graceful_json_out:
+ logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
+ return True
+ else:
+ errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
+ return errormsg
+
+ elif addr_family == "ipv6Unicast":
+ if "ipv6Unicast" in show_bgp_graceful_json_out:
+ logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
+ return True
+ else:
+ errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
+ return errormsg
+ else:
+ errormsg = "Aaddress family: {} present for {} ".format(
+ addr_family, neighbor_ip
+ )
+ return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))