from lib.common_config import (
start_topology, stop_topology, write_test_header,
- write_test_footer, reset_config_on_routers
+ write_test_footer, reset_config_on_routers, create_static_routes,
+ verify_rib, verify_admin_distance_for_static_routes
)
from lib.topolog import logger
from lib.bgp import (
write_test_footer(tc_name)
+
+
+def test_static_routes(request):
+ """ Test to create and verify static routes. """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to create static routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Api call to redistribute static routes
+ input_dict_1 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying RIB routes
+ dut = 'r3'
+ protocol = 'bgp'
+ next_hop = '10.0.0.2'
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop,
+ protocol=protocol)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_admin_distance_for_existing_static_routes(request):
+ """ Test to modify and verify admin distance for existing static routes."""
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying admin distance once modified
+ result = verify_admin_distance_for_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_advertise_network_using_network_command(request):
+ """ Test advertise networks using network command."""
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "20.0.0.0/32",
+ "no_of_network": 10
+ },
+ {
+ "network": "30.0.0.0/32",
+ "no_of_network": 10
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying RIB routes
+ dut = 'r2'
+ protocol = "bgp"
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_clear_bgp_and_verify(request):
+ """
+ Created few static routes and verified all routes are learned via BGP
+ cleared BGP and verified all routes are intact
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # clear ip bgp
+ result = clear_bgp_and_verify(tgen, topo, 'r1')
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_with_loopback_interface(request):
+ """
+ Test BGP with loopback interface
+
+ Adding keys:value pair "dest_link": "lo" and "source_link": "lo"
+ peer dict of input json file for all router's creating config using
+ loopback interface. Once BGP neighboship is up then verifying BGP
+ convergence
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ for routerN in sorted(topo['routers'].keys()):
+ for bgp_neighbor in \
+ topo['routers'][routerN]['bgp']['address_family']['ipv4'][
+ 'unicast']['neighbor'].keys():
+
+ # Adding ['source_link'] = 'lo' key:value pair
+ topo['routers'][routerN]['bgp']['address_family']['ipv4'][
+ 'unicast']['neighbor'][bgp_neighbor]["dest_link"] = {
+ 'lo': {
+ "source_link": "lo",
+ }
+ }
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "1.0.2.17/32",
+ "next_hop": "10.0.0.2"
+ },
+ {
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.6"
+ }
+ ]
+ },
+ "r2": {
+ "static_routes": [{
+ "network": "1.0.1.17/32",
+ "next_hop": "10.0.0.1"
+ },
+ {
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.10"
+ }
+ ]
+ },
+ "r3": {
+ "static_routes": [{
+ "network": "1.0.1.17/32",
+ "next_hop": "10.0.0.5"
+ },
+ {
+ "network": "1.0.2.17/32",
+ "next_hop": "10.0.0.9"
+ },
+ {
+ "network": "1.0.4.17/32",
+ "next_hop": "10.0.0.14"
+ }
+ ]
+ },
+ "r4": {
+ "static_routes": [{
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.13"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Api call verify whether BGP is converged
+ result = verify_bgp_convergence(tgen, topo)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
if __name__ == '__main__':
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
peer_uptime_before_clear_bgp = {}
# Verifying BGP convergence before bgp clear command
for retry in range(1, 11):
- show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
- isjson=True)
- logger.info(show_bgp_json)
- # Verifying output dictionary show_bgp_json is empty or not
- if not bool(show_bgp_json):
- errormsg = "BGP is not running"
- return errormsg
-
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ logger.info(show_bgp_json)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
peer_uptime_after_clear_bgp = {}
# Verifying BGP convergence after bgp clear command
for retry in range(1, 11):
- show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
- isjson=True)
- # Verifying output dictionary show_bgp_json is empty or not
- if not bool(show_bgp_json):
- errormsg = "BGP is not running"
- return errormsg
-
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
from collections import OrderedDict
from datetime import datetime
+from time import sleep
import StringIO
import os
import ConfigParser
logger.debug("Exiting lib API: create_prefix_lists()")
return result
+
+#############################################
+# Verification APIs
+#############################################
+def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and do will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+ * `protocol`[optional]: protocol, default = None
+
+ Usage
+ -----
+ # RIB can be verified for static routes OR network advertised using
+ network command. Following are input_dicts to create static routes
+ and advertise networks using network command. Any one of the input_dict
+ can be passed to verify_rib() to verify routes in DUT"s RIB.
+
+ # Creating static routes for r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
+ "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
+ }}
+ # Advertising networks using network command in router r1
+ input_dict = {
+ "r1": {
+ "advertise_networks": [{"start_ip": "20.0.0.0/32",
+ "no_of_network": 10},
+ {"start_ip": "30.0.0.0/32"}]
+ }}
+ # Verifying ipv4 routes in router r1 learned via BGP
+ dut = "r2"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_rib()")
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ if protocol:
+ command = "show ip route {} json".format(protocol)
+ else:
+ command = "show ip route json"
+ else:
+ if protocol:
+ command = "show ipv6 route {} json".format(protocol)
+ else:
+ command = "show ipv6 route json"
+
+ sleep(2)
+ logger.info("Checking router %s RIB:", router)
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No {} route found in rib of router {}..". \
+ format(protocol, router)
+ return errormsg
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+ st_found = False
+ nh_found = False
+ found_routes = []
+ missing_routes = []
+ for static_route in static_routes:
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ found_hops = [rib_r["ip"] for rib_r in
+ rib_routes_json[st_rt][0][
+ "nexthops"]]
+ for nh in next_hop:
+ nh_found = False
+ if nh and nh in found_hops:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for {}"
+ " route {} in RIB of router"
+ " {}\n".format(next_hop,
+ protocol,
+ st_rt, dut))
+
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+ if nh_found:
+ logger.info("Found next_hop %s for all routes in RIB of"
+ " router %s\n", next_hop, dut)
+
+ if not st_found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, routes: " \
+ "{}\n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s\n", dut, found_routes)
+
+ advertise_network = input_dict[routerInput].setdefault(
+ "advertise_networks", {})
+ if advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+ for advertise_network_dict in advertise_network:
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ missing_routes.append(st_rt)
+
+ if not found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, are: {}" \
+ " \n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s", dut, found_routes)
+
+ logger.info("Exiting lib API: verify_rib()")
+ return True
+
+
+def verify_admin_distance_for_static_routes(tgen, input_dict):
+ """
+ API to verify admin distance for static routes as defined in input_dict/
+ input JSON by running show ip/ipv6 route json command.
+
+ Parameter
+ ---------
+ * `tgen` : topogen object
+ * `input_dict`: having details like - for which router and static routes
+ admin dsitance needs to be verified
+ Usage
+ -----
+ # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
+ 10.0.0.2 in router r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = verify_admin_distance_for_static_routes(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ for static_route in input_dict[router]["static_routes"]:
+ addr_type = validate_ip_address(static_route["network"])
+ # Command to execute
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+ show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
+
+ logger.info("Verifying admin distance for static route %s"
+ " under dut %s:", static_route, router)
+ network = static_route["network"]
+ next_hop = static_route["next_hop"]
+ admin_distance = static_route["admin_distance"]
+ route_data = show_ip_route_json[network][0]
+ if network in show_ip_route_json:
+ if route_data["nexthops"][0]["ip"] == next_hop:
+ if route_data["distance"] != admin_distance:
+ errormsg = ("Verification failed: admin distance"
+ " for static route {} under dut {},"
+ " found:{} but expected:{}".
+ format(static_route, router,
+ route_data["distance"],
+ admin_distance))
+ return errormsg
+ else:
+ logger.info("Verification successful: admin"
+ " distance for static route %s under"
+ " dut %s, found:%s", static_route,
+ router, route_data["distance"])
+
+ else:
+ errormsg = ("Static route {} not found in "
+ "show_ip_route_json for dut {}".
+ format(network, router))
+ return errormsg
+
+ logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
+ return True