diff options
Diffstat (limited to 'tests/topotests/lib/bgp.py')
| -rw-r--r-- | tests/topotests/lib/bgp.py | 152 |
1 files changed, 149 insertions, 3 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 556240bfb5..551483d718 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -989,6 +989,14 @@ def __create_bgp_unicast_address_family( if "no_allowas_in" in peer: allow_as_in = peer["no_allowas_in"] config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in)) + + if "shutdown" in peer: + shut_val = peer["shutdown"] + if shut_val is True: + config_data.append("{} shutdown".format(neigh_cxt)) + elif shut_val is False: + config_data.append("no {} shutdown".format(neigh_cxt)) + if prefix_lists: for prefix_list in prefix_lists: name = prefix_list.setdefault("name", {}) @@ -2221,6 +2229,7 @@ def verify_bgp_attributes( rmap_name=None, input_dict=None, seq_id=None, + vrf=None, nexthop=None, expected=True, ): @@ -2275,7 +2284,10 @@ def verify_bgp_attributes( logger.info("Verifying BGP set attributes for dut {}:".format(router)) for static_route in static_routes: - cmd = "show bgp {} {} json".format(addr_type, static_route) + if vrf: + cmd = "show bgp vrf {} {} {} json".format(vrf, addr_type, static_route) + else: + cmd = "show bgp {} {} json".format(addr_type, static_route) show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True) dict_to_test = [] @@ -2821,7 +2833,6 @@ def verify_bgp_rib( st_rt, dut, ) - return errormsg else: nh_found = True @@ -3050,7 +3061,12 @@ def verify_graceful_restart( if router != dut: continue - bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"] + try: + bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"] + except TypeError: + bgp_addr_type = topo["routers"][dut]["bgp"][0]["address_family"] + + # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"] if addr_type in bgp_addr_type: if not check_address_types(addr_type): @@ -4428,3 +4444,133 @@ def verify_evpn_routes( logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return False + + +@retry(retry_timeout=10) +def verify_bgp_bestpath(tgen, addr_type, input_dict): + """ + Verifies bgp next hop values in best-path output + + * `dut` : device under test + * `addr_type` : Address type ipv4/ipv6 + * `input_dict`: having details like multipath and bestpath + + Usage + ----- + input_dict_1 = { + "r1": { + "ipv4" : { + "bestpath": "50.0.0.1", + "multipath": ["50.0.0.1", "50.0.0.2"], + "network": "100.0.0.0/24" + } + "ipv6" : { + "bestpath": "1000::1", + "multipath": ["1000::1", "1000::2"] + "network": "2000::1/128" + } + } + } + + result = verify_bgp_bestpath(tgen, input_dict) + + """ + + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut) + result = False + for network_dict in input_dict[dut][addr_type]: + nw_addr = network_dict.setdefault("network", None) + vrf = network_dict.setdefault("vrf", None) + bestpath = network_dict.setdefault("bestpath", None) + + if vrf: + cmd = "show bgp vrf {} {} {} bestpath json".format( + vrf, addr_type, nw_addr + ) + else: + cmd = "show bgp {} {} bestpath json".format(addr_type, nw_addr) + + data = run_frr_cmd(rnode, cmd, isjson=True) + route = data["paths"][0] + + if "bestpath" in route: + if route["bestpath"]["overall"] is True: + _bestpath = route["nexthops"][0]["ip"] + + if _bestpath != bestpath: + return ( + "DUT:[{}] Bestpath do not match for" + " network: {}, Expected " + " {} as bgp bestpath found {}".format( + dut, nw_addr, bestpath, _bestpath + ) + ) + + logger.info( + "DUT:[{}] Found expected bestpath: " + " {} for network: {}".format(dut, _bestpath, nw_addr) + ) + result = True + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result + + +def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None): + """ + This api is used to verify the tcp-mss value assigned to a neigbour of DUT + + Parameters + ---------- + * `tgen` : topogen object + * `dut`: device under test + * `neighbour`:neigbout IP address + * `configured_tcp_mss`:The TCP-MSS value to be verified + * `vrf`:vrf + + Usage + ----- + result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + rnode = tgen.routers()[dut] + if vrf: + cmd = "show bgp vrf {} neighbors {} json".format(vrf, neighbour) + else: + cmd = "show bgp neighbors {} json".format(neighbour) + + # Execute the command + show_vrf_stats = run_frr_cmd(rnode, cmd, isjson=True) + + # Verify TCP-MSS on router + logger.info("Verify that no core is observed") + if tgen.routers_have_failure(): + errormsg = "Core observed while running CLI: %s" % (cmd) + return errormsg + else: + if configured_tcp_mss == show_vrf_stats.get(neighbour).get( + "bgpTcpMssConfigured" + ): + logger.debug( + "Configured TCP - MSS Found: {}".format(sys._getframe().f_code.co_name) + ) + return True + else: + logger.debug( + "TCP-MSS Mismatch ,configured {} expecting {}".format( + show_vrf_stats.get(neighbour).get("bgpTcpMssConfigured"), + configured_tcp_mss, + ) + ) + return "TCP-MSS Mismatch" + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False |
