summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py92
1 files changed, 90 insertions, 2 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 458ae4b054..3253fe6900 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -989,6 +989,14 @@ def __create_bgp_unicast_address_family(
if "no_allowas_in" in peer:
allow_as_in = peer["no_allowas_in"]
config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
+
+ if "shutdown" in peer:
+ shut_val = peer["shutdown"]
+ if shut_val is True:
+ config_data.append("{} shutdown".format(neigh_cxt))
+ elif shut_val is False:
+ config_data.append("no {} shutdown".format(neigh_cxt))
+
if prefix_lists:
for prefix_list in prefix_lists:
name = prefix_list.setdefault("name", {})
@@ -2221,6 +2229,7 @@ def verify_bgp_attributes(
rmap_name=None,
input_dict=None,
seq_id=None,
+ vrf=None,
nexthop=None,
expected=True,
):
@@ -2275,7 +2284,10 @@ def verify_bgp_attributes(
logger.info("Verifying BGP set attributes for dut {}:".format(router))
for static_route in static_routes:
- cmd = "show bgp {} {} json".format(addr_type, static_route)
+ if vrf:
+ cmd = "show bgp vrf {} {} {} json".format(vrf, addr_type, static_route)
+ else:
+ cmd = "show bgp {} {} json".format(addr_type, static_route)
show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
dict_to_test = []
@@ -2821,7 +2833,6 @@ def verify_bgp_rib(
st_rt,
dut,
)
- return errormsg
else:
nh_found = True
@@ -4428,6 +4439,83 @@ def verify_evpn_routes(
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return False
+
+
+@retry(retry_timeout=10)
+def verify_bgp_bestpath(tgen, addr_type, input_dict):
+ """
+ Verifies bgp next hop values in best-path output
+
+ * `dut` : device under test
+ * `addr_type` : Address type ipv4/ipv6
+ * `input_dict`: having details like multipath and bestpath
+
+ Usage
+ -----
+ input_dict_1 = {
+ "r1": {
+ "ipv4" : {
+ "bestpath": "50.0.0.1",
+ "multipath": ["50.0.0.1", "50.0.0.2"],
+ "network": "100.0.0.0/24"
+ }
+ "ipv6" : {
+ "bestpath": "1000::1",
+ "multipath": ["1000::1", "1000::2"]
+ "network": "2000::1/128"
+ }
+ }
+ }
+
+ result = verify_bgp_bestpath(tgen, input_dict)
+
+ """
+
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ for dut in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut)
+ result = False
+ for network_dict in input_dict[dut][addr_type]:
+ nw_addr = network_dict.setdefault("network", None)
+ vrf = network_dict.setdefault("vrf", None)
+ bestpath = network_dict.setdefault("bestpath", None)
+
+ if vrf:
+ cmd = "show bgp vrf {} {} {} bestpath json".format(
+ vrf, addr_type, nw_addr
+ )
+ else:
+ cmd = "show bgp {} {} bestpath json".format(addr_type, nw_addr)
+
+ data = run_frr_cmd(rnode, cmd, isjson=True)
+ route = data["paths"][0]
+
+ if "bestpath" in route:
+ if route["bestpath"]["overall"] is True:
+ _bestpath = route["nexthops"][0]["ip"]
+
+ if _bestpath != bestpath:
+ return (
+ "DUT:[{}] Bestpath do not match for"
+ " network: {}, Expected "
+ " {} as bgp bestpath found {}".format(
+ dut, nw_addr, bestpath, _bestpath
+ )
+ )
+
+ logger.info(
+ "DUT:[{}] Found expected bestpath: "
+ " {} for network: {}".format(dut, _bestpath, nw_addr)
+ )
+ result = True
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return result
+
+
def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
"""
This api is used to verify the tcp-mss value assigned to a neigbour of DUT