diff options
Diffstat (limited to 'tests')
23 files changed, 7805 insertions, 0 deletions
diff --git a/tests/topotests/bgp-basic-functionality-topo1/__init__.py b/tests/topotests/bgp-basic-functionality-topo1/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/bgp-basic-functionality-topo1/__init__.py diff --git a/tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json b/tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json new file mode 100644 index 0000000000..c778ae4bed --- /dev/null +++ b/tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json @@ -0,0 +1,172 @@ +{ + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 30, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:DB8:F::", + "v6mask": 128 + }, + "routers": { + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + } + } + } + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r4": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r4": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r4": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "200", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {} + } + } + } + } + } + } + } + } + } +} diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py new file mode 100755 index 0000000000..095ebe3344 --- /dev/null +++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py @@ -0,0 +1,595 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test BGP basic functionality: + +Test steps +- Create topology (setup module) + Creating 4 routers topology, r1, r2, r3 are in IBGP and + r3, r4 are in EBGP +- Bring up topology +- Verify for bgp to converge +- Modify/Delete and verify router-id +- Modify and verify bgp timers +- Create and verify static routes +- Modify and verify admin distance for existing static routes +- Test advertise network using network command +- Verify clear bgp +- Test bgp convergence with loopback interface +- Test advertise network using network command +""" + +import os +import sys +import json +import time +import pytest +from copy import deepcopy + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, '../lib/')) + +# Required to instantiate the topology builder class. + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen +from mininet.topo import Topo + +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, reset_config_on_routers, create_static_routes, + verify_rib, verify_admin_distance_for_static_routes +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, create_router_bgp, verify_router_id, + modify_as_number, verify_as_numbers, clear_bgp_and_verify, + verify_bgp_timers_and_functionality +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology creation +jsonFile = "{}/bgp_basic_functionality.json".format(CWD) +try: + with open(jsonFile, 'r') as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + + +class CreateTopo(Topo): + """ + Test BasicTopo - topology 1 + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + """Build function""" + tgen = get_topogen(self) + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(CreateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + global BGP_CONVERGENCE + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}". \ + format(BGP_CONVERGENCE) + + logger.info("Running setup_module() done") + + +def teardown_module(): + """Teardown the pytest environment""" + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info("Testsuite end time: {}". + format(time.asctime(time.localtime(time.time())))) + logger.info("=" * 40) + + +##################################################### +# +# Testcases +# +##################################################### + + +def test_modify_and_delete_router_id(request): + """ Test to modify, delete and verify router-id. """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Modify router id + input_dict = { + 'r1': { + "bgp": { + 'router_id': '12.12.12.12' + } + }, + 'r2': { + "bgp": { + 'router_id': '22.22.22.22' + } + }, + 'r3': { + "bgp": { + 'router_id': '33.33.33.33' + } + }, + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".\ + format(tc_name, result) + + # Verifying router id once modified + result = verify_router_id(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".\ + format(tc_name, result) + + # Delete router id + input_dict = { + 'r1': { + "bgp": { + 'del_router_id': True + } + }, + 'r2': { + "bgp": { + 'del_router_id': True + } + }, + 'r3': { + "bgp": { + 'del_router_id': True + } + }, + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying router id once deleted + # Once router-id is deleted, highest interface ip should become + # router-id + result = verify_router_id(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_config_with_4byte_as_number(request): + """ + Configure BGP with 4 byte ASN and verify it works fine + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + input_dict = { + "r1": { + "bgp": { + "local_as": 131079 + } + }, + "r2": { + "bgp": { + "local_as": 131079 + } + }, + "r3": { + "bgp": { + "local_as": 131079 + } + }, + "r4": { + "bgp": { + "local_as": 131080 + } + } + } + result = modify_as_number(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + result = verify_as_numbers(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_timers_functionality(request): + """ + Test to modify bgp timers and verify timers functionality. + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to modfiy BGP timerse + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link":{ + "r1": { + "keepalivetimer": 60, + "holddowntimer": 180, + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, deepcopy(input_dict)) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Api call to clear bgp, so timer modification would take place + clear_bgp_and_verify(tgen, topo, 'r1') + + # Verifying bgp timers functionality + result = verify_bgp_timers_and_functionality(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + + + +def test_static_routes(request): + """ Test to create and verify static routes. """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to create static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + next_hop = '10.0.0.2' + result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop, + protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_admin_distance_for_existing_static_routes(request): + """ Test to modify and verify admin distance for existing static routes.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying admin distance once modified + result = verify_admin_distance_for_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_advertise_network_using_network_command(request): + """ Test advertise networks using network command.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "20.0.0.0/32", + "no_of_network": 10 + }, + { + "network": "30.0.0.0/32", + "no_of_network": 10 + } + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying RIB routes + dut = 'r2' + protocol = "bgp" + result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_clear_bgp_and_verify(request): + """ + Created few static routes and verified all routes are learned via BGP + cleared BGP and verified all routes are intact + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # clear ip bgp + result = clear_bgp_and_verify(tgen, topo, 'r1') + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_with_loopback_interface(request): + """ + Test BGP with loopback interface + + Adding keys:value pair "dest_link": "lo" and "source_link": "lo" + peer dict of input json file for all router's creating config using + loopback interface. Once BGP neighboship is up then verifying BGP + convergence + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for routerN in sorted(topo['routers'].keys()): + for bgp_neighbor in \ + topo['routers'][routerN]['bgp']['address_family']['ipv4'][ + 'unicast']['neighbor'].keys(): + + # Adding ['source_link'] = 'lo' key:value pair + topo['routers'][routerN]['bgp']['address_family']['ipv4'][ + 'unicast']['neighbor'][bgp_neighbor]["dest_link"] = { + 'lo': { + "source_link": "lo", + } + } + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + input_dict = { + "r1": { + "static_routes": [{ + "network": "1.0.2.17/32", + "next_hop": "10.0.0.2" + }, + { + "network": "1.0.3.17/32", + "next_hop": "10.0.0.6" + } + ] + }, + "r2": { + "static_routes": [{ + "network": "1.0.1.17/32", + "next_hop": "10.0.0.1" + }, + { + "network": "1.0.3.17/32", + "next_hop": "10.0.0.10" + } + ] + }, + "r3": { + "static_routes": [{ + "network": "1.0.1.17/32", + "next_hop": "10.0.0.5" + }, + { + "network": "1.0.2.17/32", + "next_hop": "10.0.0.9" + }, + { + "network": "1.0.4.17/32", + "next_hop": "10.0.0.14" + } + ] + }, + "r4": { + "static_routes": [{ + "network": "1.0.3.17/32", + "next_hop": "10.0.0.13" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Api call verify whether BGP is converged + result = verify_bgp_convergence(tgen, topo) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == '__main__': + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp-path-attributes-topo1/__init__.py b/tests/topotests/bgp-path-attributes-topo1/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/bgp-path-attributes-topo1/__init__.py diff --git a/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json new file mode 100644 index 0000000000..15b7ec13be --- /dev/null +++ b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json @@ -0,0 +1,220 @@ +{ + "ipv4base":"10.0.0.0", + "ipv4mask":30, + "ipv6base":"fd00::", + "ipv6mask":64, + "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64}, + "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128}, + "routers":{ + "r1":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"555", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1": {"ipv4": "auto", "ipv6": "auto"}, + "r3": {"ipv4": "auto", "ipv6": "auto"}, + "r4-link1": {"ipv4": "auto", "ipv6": "auto"}, + "r4-link2": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"555", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + }, + "r4": { + "dest_link": { + "r2-link1": {} + } + } + } + } + } + } + } + }, + "r3":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r1":{"ipv4":"auto", "ipv6":"auto"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r5":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"555", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r5": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r4":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r2-link1": {"ipv4": "auto", "ipv6": "auto"}, + "r2-link2": {"ipv4": "auto", "ipv6": "auto"}, + "r6": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp": { + "local_as": "666", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r4-link1": {} + } + }, + "r6": { + "dest_link": { + "r4": {} + } + } + } + } + } + } + } + }, + "r5":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r3": {"ipv4": "auto", "ipv6": "auto"}, + "r7": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"666", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r5": {} + } + }, + "r7": { + "dest_link": { + "r5": {} + } + } + } + } + } + } + } + }, + "r6":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r4": {"ipv4": "auto", "ipv6": "auto"}, + "r7": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"777", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r6": {} + } + }, + "r7": { + "dest_link": { + "r6": {} + } + } + } + } + } + } + } + }, + "r7":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"}, + "r5": {"ipv4": "auto", "ipv6": "auto"}, + "r6": {"ipv4": "auto", "ipv6": "auto"} + }, + "bgp":{ + "local_as":"888", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r5": { + "dest_link": { + "r7": {} + } + }, + "r6": { + "dest_link": { + "r7": {} + } + } + } + } + } + } + } + } + } +} diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py new file mode 100755 index 0000000000..abd6b396d1 --- /dev/null +++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py @@ -0,0 +1,1078 @@ +#!/usr/bin/env python + +# +# Modified work Copyright (c) 2019 by VMware, Inc. ("VMware") +# Original work Copyright (c) 2018 by Network Device Education +# Foundation, Inc. ("NetDEF") +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test AS-Path functionality: + +Setup module: +- Create topology (setup module) +- Bring up topology +- Verify BGP convergence + +Test cases: +1. Test next_hop attribute and verify best path is installed as per + reachable next_hop +2. Test aspath attribute and verify best path is installed as per + shortest AS-Path +3. Test localpref attribute and verify best path is installed as per + shortest local-preference +4. Test weight attribute and and verify best path is installed as per + highest weight +5. Test origin attribute and verify best path is installed as per + IGP>EGP>INCOMPLETE rule +6. Test med attribute and verify best path is installed as per lowest + med value +7. Test admin distance and verify best path is installed as per lowest + admin distance + +Teardown module: +- Bring down the topology +- stop routers + +""" + +import os +import sys +import pdb +import json +import time +import inspect +import ipaddress +from time import sleep +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from mininet.topo import Topo +from lib import topotest +from lib.topogen import Topogen, TopoRouter, get_topogen + +# Required to instantiate the topology builder class. +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, reset_config_on_routers, + verify_rib, create_static_routes, + create_prefix_lists, verify_prefix_lists, + create_route_maps +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, create_router_bgp, + clear_bgp_and_verify, verify_best_path_as_per_bgp_attribute, + verify_best_path_as_per_admin_distance +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology creation +jsonFile = "{}/bgp_path_attributes.json".format(CWD) + +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + + +#### +class CreateTopo(Topo): + """ + Test CreateTopo - topology 1 + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # Building topology and configuration from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: %s", testsuite_run_time) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(CreateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Checking BGP convergence + result = verify_bgp_convergence(tgen, topo) + assert result is True, ("setup_module :Failed \n Error:" + " {}".format(result)) + + logger.info("Running setup_module() done") + + +def teardown_module(): + """ + Teardown the pytest environment + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info("Testsuite end time: %s", + time.asctime(time.localtime(time.time()))) + logger.info("=" * 40) + + +##################################################### +## +## Testcases +## +##################################################### +def test_next_hop_attribute(request): + """ + Verifying route are not getting installed in, as next_hop is + unreachable, Making next hop reachable using next_hop_self + command and verifying routes are installed. + """ + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp":{ + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r1" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + try: + assert result is True + except AssertionError: + logger.info("Expected behaviour: %s", result) + + # Configure next-hop-self to bgp neighbor + input_dict_1 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r1" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_aspath_attribute(request): + " Verifying AS_PATH attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "aspath" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_localpref_attribute(request): + " Verifying LOCAL PREFERENCE attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Prefix list + input_dict_2 = { + "r2": { + "prefix_lists": { + "ipv4": { + "pf_ls_1": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r2": { + "route_maps": { + "RMAP_LOCAL_PREF": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_1" + } + }, + "set": { + "localpref": 1000 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": "RMAP_LOCAL_PREF", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "localpref" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_weight_attribute(request): + " Verifying WEIGHT attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_ls_1": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r1": { + "route_maps": { + "RMAP_WEIGHT": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_1" + } + }, + "set": { + "weight": 500 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": { + "route_maps": [ + {"name": "RMAP_WEIGHT", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "weight" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_origin_attribute(request): + " Verifying ORIGIN attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r4": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r5": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to create static routes + input_dict_3 = { + "r5": { + "static_routes": [ + { + "network": "200.50.2.0/32", + "next_hop": "10.0.0.26" + }, + { + "network": "200.60.2.0/32", + "next_hop": "10.0.0.26" + } + ] + } + } + result = create_static_routes(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Configure next-hop-self to bgp neighbor + + # Verifying best path + dut = "r1" + attribute = "origin" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_med_attribute(request): + " Verifying MED attribute functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r4": { + "bgp":{ + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to advertise networks + + + # Configure next-hop-self to bgp neighbor + + + # Create Prefix list + input_dict_3 = { + "r2": { + "prefix_lists": { + "ipv4": { + "pf_ls_r2": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + }, + "r3": { + "prefix_lists": { + "ipv4": { + "pf_ls_r3": [{ + "seqid": 10, + "network": "200.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create route map + input_dict_3 = { + "r2": { + "route_maps": { + "RMAP_MED_R2": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_r2" + } + }, + "set": { + "med": 100 + } + }] + } + }, + "r3": { + "route_maps": { + "RMAP_MED_R3": [{ + "action": "permit", + "match": { + "ipv4": { + "prefix_lists": "pf_ls_r3" + } + }, + "set": { + "med": 10 + } + }] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure neighbor for route map + input_dict_4 = { + "r5": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r2-link1": { + "route_maps": [ + {"name": "RMAP_MED_R2", + "direction": "in"} + ] + } + } + }, + "r1": { + "dest_link": { + "r2": {"next_hop_self": True} + } + } + } + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {"next_hop_self": True} + } + }, + "r5": { + "dest_link": { + "r3": { + "route_maps": [ + {"name": "RMAP_MED_R3", + "direction": "in"} + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "med" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + logger.info("Testcase %s :Passed \n", tc_name) + + # Uncomment next line for debugging + # tgen.mininet_cli() + + +def test_admin_distance(request): + " Verifying admin distance functionality" + + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to create static routes + input_dict = { + "r2": { + "static_routes": [ + { + "network": "200.50.2.0/32", + "admin_distance": 80, + "next_hop": "10.0.0.14" + }, + { + "network": "200.50.2.0/32", + "admin_distance": 60, + "next_hop": "10.0.0.18" + } + ] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + input_dict_2 = { + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying best path + dut = "r1" + attribute = "admin_distance" + result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, + input_dict, attribute) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp-prefix-list-topo1/__init__.py b/tests/topotests/bgp-prefix-list-topo1/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/bgp-prefix-list-topo1/__init__.py diff --git a/tests/topotests/bgp-prefix-list-topo1/prefix_lists.json b/tests/topotests/bgp-prefix-list-topo1/prefix_lists.json new file mode 100644 index 0000000000..3bb07ad994 --- /dev/null +++ b/tests/topotests/bgp-prefix-list-topo1/prefix_lists.json @@ -0,0 +1,123 @@ +{ + "address_types": ["ipv4"], + "ipv4base":"10.0.0.0", + "ipv4mask":30, + "ipv6base":"fd00::", + "ipv6mask":64, + "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64}, + "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128}, + "routers":{ + "r1":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"}, + "r1":{"ipv4":"auto", "ipv6":"auto"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + } + } + } + }, + "r3":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"}, + "r1":{"ipv4":"auto", "ipv6":"auto"}, + "r2":{"ipv4":"auto", "ipv6":"auto"}, + "r4":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": {} + } + }, + "r2": { + "dest_link": { + "r3": {} + } + }, + "r4": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r4":{ + "links":{ + "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"}, + "r3":{"ipv4":"auto", "ipv6":"auto"} + }, + "bgp":{ + "local_as":"200", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r4": {} + } + } + } + } + } + } + } + } + } +} diff --git a/tests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py b/tests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py new file mode 100755 index 0000000000..25a346f20d --- /dev/null +++ b/tests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py @@ -0,0 +1,1450 @@ +#!/usr/bin/python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test prefix-list functionality: + +Test steps +- Create topology (setup module) + Creating 4 routers topology, r1, r2, r3 are in IBGP and + r3, r4 are in EBGP +- Bring up topology +- Verify for bgp to converge + +IP prefix-list tests +- Test ip prefix-lists IN permit +- Test ip prefix-lists OUT permit +- Test ip prefix-lists IN deny and permit any +- Test delete ip prefix-lists +- Test ip prefix-lists OUT deny and permit any +- Test modify ip prefix-lists IN permit to deny +- Test modify ip prefix-lists IN deny to permit +- Test modify ip prefix-lists OUT permit to deny +- Test modify prefix-lists OUT deny to permit +- Test ip prefix-lists implicit deny +""" + +import sys +import json +import time +import os +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from mininet.topo import Topo +from lib.topogen import Topogen, get_topogen + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, reset_config_on_routers, + verify_rib, create_static_routes, + create_prefix_lists, verify_prefix_lists +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, create_router_bgp, + clear_bgp_and_verify +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology creation +jsonFile = "{}/prefix_lists.json".format(CWD) + +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +bgp_convergence = False + + +class BGPPrefixListTopo(Topo): + """ + Test BGPPrefixListTopo - topology 1 + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("="*40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(BGPPrefixListTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Checking BGP convergence + global BGP_CONVERGENCE + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, ("setup_module :Failed \n Error:" + " {}".format(BGP_CONVERGENCE)) + + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info("Testsuite end time: {}". + format(time.asctime(time.localtime(time.time())))) + logger.info("="*40) + +##################################################### +# +# Tests starting +# +##################################################### + + +def test_ip_prefix_lists_in_permit(request): + """ + Create ip prefix list and test permit prefixes IN direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Create Static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "20.0.20.1/32", + "no_of_ip": 1, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create ip prefix list + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": 10, + "network": "any", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure bgp neighbor with prefix list + input_dict_3 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "in" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_ip_prefix_lists_out_permit(request): + """ + Create ip prefix list and test permit prefixes out direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 1, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Static routes + input_dict_1 = { + "r1": { + "static_routes": [{ + "network": "20.0.20.1/32", + "no_of_ip": 1, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + input_dict_5 = { + "r3": { + "static_routes": [{ + "network": "10.0.0.2/30", + "no_of_ip": 1, + "next_hop": "10.0.0.9" + }] + } + } + result = create_static_routes(tgen, input_dict_5) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Create ip prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": 10, + "network": "20.0.20.1/32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + # Configure bgp neighbor with prefix list + input_dict_3 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r3": { + "dest_link": { + "r1": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "out" + } + ] + } + } + } + }, + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + write_test_footer(tc_name) + + +def test_ip_prefix_lists_in_deny_and_permit_any(request): + """ + Create ip prefix list and test permit/deny prefixes IN direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 1, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + # Create ip prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.20.1/32", + "action": "deny" + }, + { + "seqid": "11", + "network": "any", + "action": "permit" + } + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure bgp neighbor with prefix list + input_dict_3 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "in" + } + ] + } + } + } + } + } + } + } + } + } + } + # Configure prefix list to bgp neighbor + result = create_router_bgp(tgen, topo, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_delete_prefix_lists(request): + """ + Delete ip prefix list + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create ip prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.20.1/32", + "action": "deny" + } + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + result = verify_prefix_lists(tgen, input_dict_2) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + logger.info(result) + + # Delete prefix list + input_dict_2 = { + "r1": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.20.1/32", + "action": "deny", + "delete": True + } + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + result = verify_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_ip_prefix_lists_out_deny_and_permit_any(request): + """ + Create ip prefix list and test deny/permit any prefixes OUT direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Static Routes + input_dict_1 = { + "r2": { + "static_routes": [{ + "network": "20.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.1" + }] + } + } + result = create_static_routes(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Create ip prefix list + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "deny" + }, + { + "seqid": "11", + "network": "any", + "action": "permit" + } + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + input_dict_4 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "out" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_modify_prefix_lists_in_permit_to_deny(request): + """ + Modify ip prefix list and test permit to deny prefixes IN direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Create ip prefix list + input_dict_2 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + input_dict_3 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link":{ + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "in" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Modify prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "deny" + }, + { + "seqid": "11", + "network": "any", + "action": "permit" + } + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to clear bgp, so config changes would be reflected + dut = "r3" + result = clear_bgp_and_verify(tgen, topo, dut) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_modify_prefix_lists_in_deny_to_permit(request): + """ + Modify ip prefix list and test deny to permit prefixes IN direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Create ip prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "deny" + }, + { + "seqid": "11", + "network": "any", + "action": "permit" + } + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "in" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Modify ip prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to clear bgp, so config changes would be reflected + dut = "r3" + result = clear_bgp_and_verify(tgen, topo, dut) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r3" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_modify_prefix_lists_out_permit_to_deny(request): + """ + Modify ip prefix list and test permit to deny prefixes OUT direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + + # Create ip prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "out" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Modify ip prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "deny" + }, + { + "seqid": "11", + "network": "any", + "action": "permit" + } + ] + } + } + } + + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to clear bgp, so config changes would be reflected + dut = "r3" + result = clear_bgp_and_verify(tgen, topo, dut) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_modify_prefix_lists_out_deny_to_permit(request): + """ + Modify ip prefix list and test deny to permit prefixes OUT direction + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + # Create ip prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [ + { + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "deny" + }, + { + "seqid": "11", + "network": "any", + "action": "permit" + } + ] + } + } + } + + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link":{ + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "out" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Modify ip prefix list + input_dict_1 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + + } + result = create_prefix_lists(tgen, input_dict_1) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to clear bgp, so config changes would be reflected + dut = "r3" + result = clear_bgp_and_verify(tgen, topo, dut) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +def test_ip_prefix_lists_implicit_deny(request): + """ + Create ip prefix list and test implicit deny + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip("skipped because of BGP Convergence failure") + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Create Static Routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Create Static Routes + input_dict_1 = { + "r2": { + "static_routes": [{ + "network": "20.0.20.1/32", + "no_of_ip": 9, + "next_hop": "10.0.0.1" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Api call to redistribute static routes + # Create ip prefix list + input_dict_3 = { + "r3": { + "prefix_lists": { + "ipv4": { + "pf_list_1": [{ + "seqid": "10", + "network": "10.0.0.0/8", + "le": "32", + "action": "permit" + }] + } + } + } + + } + result = create_prefix_lists(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Configure prefix list to bgp neighbor + input_dict_4 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r2": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + }, + "r3": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r4": { + "dest_link": { + "r3": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "out" + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + # Verifying RIB routes + dut = "r4" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol) + assert result is not True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/example-topojson-test/__init__.py b/tests/topotests/example-topojson-test/__init__.py new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/example-topojson-test/__init__.py diff --git a/tests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py diff --git a/tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json new file mode 100644 index 0000000000..3968348b1f --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json @@ -0,0 +1,152 @@ +{ + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 30, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:DB8:F::", + "v6mask": 128 + }, + "routers": { + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2-link1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r2-link2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1-link1": {} + } + } + } + } + } + } + } + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1-link1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r1-link2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3-link1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3-link2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + { + "redist_type": "static" + } + ], + "neighbor": { + "r1": { + "dest_link": { + "r2-link1": {} + } + }, + "r3": { + "dest_link": { + "r2-link1": {} + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "100.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.1" + } + ] + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2-link1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r2-link2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3-link1": {} + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "10.0.0.1/30", + "next_hop": "10.0.0.5" + } + ] + } + } +} + diff --git a/tests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py new file mode 100755 index 0000000000..8e794b9946 --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF") +# in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +<example>.py: Test <example tests>. +""" + +import os +import sys +import json +import time +import inspect +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, '../../')) + +# pylint: disable=C0413 +from lib.topogen import Topogen, get_topogen + +# Required to instantiate the topology builder class. +from mininet.topo import Topo + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, verify_rib +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology and configuration creation +jsonFile = "{}/example_topojson_multiple_links.json".format(CWD) +try: + with open(jsonFile, 'r') as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +bgp_convergence = False +input_dict = {} + + +class TemplateTopo(Topo): + """ + Test topology builder + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # This function only purpose is to create topology + # as defined in input json file. + # + # Example + # + # Creating 2 routers having 2 links in between, + # one is used to establised BGP neighborship + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(TemplateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # This function only purpose is to create configuration + # as defined in input json file. + # + # Example + # + # Creating configuration defined in input JSON + # file, example, BGP config, interface config, static routes + # config, prefix list config + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + +def test_bgp_convergence(request): + " Test BGP daemon convergence " + + tgen = get_topogen() + global bgp_convergence + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + bgp_convergence = verify_bgp_convergence(tgen, topo) + assert bgp_convergence is True, "test_bgp_convergence failed.. \n" \ + " Error: {}".format(bgp_convergence) + + logger.info("BGP is converged successfully \n") + write_test_footer(tc_name) + + +def test_static_routes(request): + " Test to create and verify static routes. " + + tgen = get_topogen() + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Static routes are created as part of initial configuration, + # verifying RIB + dut = 'r3' + protocol = 'bgp' + next_hop = '10.0.0.1' + input_dict = {"r1": topo["routers"]["r1"]} + + # Uncomment below to debug + # tgen.mininet_cli() + result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == '__main__': + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py b/tests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json b/tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json new file mode 100644 index 0000000000..629d2d6d78 --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json @@ -0,0 +1,153 @@ +{ + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 30, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:DB8:F::", + "v6mask": 128 + }, + "routers": { + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + }, + "r3": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + { + "redist_type": "static" + } + ], + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "100.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.1" + } + ] + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {} + } + }, + "r1": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "10.0.0.1/30", + "next_hop": "10.0.0.5" + } + ] + } + } +} diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py b/tests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py new file mode 100755 index 0000000000..315c7b3f2d --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF") +# in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +<example>.py: Test <example tests>. +""" + +import os +import sys +import time +import json +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, '../../')) + +# pylint: disable=C0413 +from lib.topogen import Topogen, get_topogen + +# Required to instantiate the topology builder class. +from mininet.topo import Topo + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, verify_rib +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology and configuration creation +jsonFile = "{}/example_topojson.json".format(CWD) + +try: + with open(jsonFile, 'r') as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +bgp_convergence = False +input_dict = {} + +class TemplateTopo(Topo): + """ + Test topology builder + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # This function only purpose is to create topology + # as defined in input json file. + # + # Example + # + # Creating 2 routers having single links in between, + # which is used to establised BGP neighborship + + # Building topology from json file + build_topo_from_json(tgen, topo) + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("="*40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(TemplateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # This function only purpose is to create configuration + # as defined in input json file. + # + # Example + # + # Creating configuration defined in input JSON + # file, example, BGP config, interface config, static routes + # config, prefix list config + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + logger.info("Running setup_module() done") + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + +def test_bgp_convergence(request): + " Test BGP daemon convergence " + + tgen = get_topogen() + global bgp_convergence + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + bgp_convergence = verify_bgp_convergence(tgen, topo) + assert bgp_convergence is True, "test_bgp_convergence failed.. \n"\ + " Error: {}".format(bgp_convergence) + + logger.info("BGP is converged successfully \n") + write_test_footer(tc_name) + + +def test_static_routes(request): + " Test to create and verify static routes. " + + tgen = get_topogen() + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Static routes are created as part of initial configuration, + # verifying RIB + dut = 'r3' + next_hop = '10.0.0.1' + input_dict = {"r1": topo["routers"]["r1"]} + + # Uncomment below to debug + # tgen.mininet_cli() + result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == '__main__': + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json new file mode 100644 index 0000000000..c76c6264be --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json @@ -0,0 +1,161 @@ +{ + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 30, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:DB8:F::", + "v6mask": 128 + }, + "routers": { + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "lo": { + "source_link": "lo" + } + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "1.0.2.17/32", + "next_hop": "10.0.0.2" + } + ] + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + { + "redist_type": "static" + } + ], + "neighbor": { + "r1": { + "dest_link": { + "lo": { + "source_link": "lo" + } + } + }, + "r3": { + "dest_link": { + "lo": { + "source_link": "lo" + } + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "100.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.1" + }, + { + "network": "1.0.1.17/32", + "next_hop": "10.0.0.1" + }, + { + "network": "1.0.3.17/32", + "next_hop": "10.0.0.6" + } + ] + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "lo": { + "source_link": "lo" + } + } + } + } + } + } + } + }, + "static_routes": [ + { + "network": "1.0.2.17/32", + "next_hop": "10.0.0.5" + }, + { + "network": "10.0.0.1/30", + "next_hop": "10.0.0.5" + } + ] + } + } +} diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py new file mode 100755 index 0000000000..b794b96a63 --- /dev/null +++ b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. +# ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +<example>.py: Test <example tests>. +""" + +import os +import sys +import time +import json +import inspect +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, '../../')) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from lib.topogen import Topogen, get_topogen + +# Required to instantiate the topology builder class. +from mininet.topo import Topo + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, write_test_header, + write_test_footer, verify_rib +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence +) +from lib.topojson import build_topo_from_json, build_config_from_json + +# Reading the data from JSON File for topology and configuration creation +jsonFile = "{}/example_topojson.json".format(CWD) + +try: + with open(jsonFile, 'r') as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +bgp_convergence = False +input_dict = {} + + +class TemplateTopo(Topo): + """ + Test topology builder + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + "Build function" + tgen = get_topogen(self) + + # This function only purpose is to create topology + # as defined in input json file. + # + # Example + # + # Creating 2 routers having single links in between, + # which is used to establised BGP neighborship + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("="*40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(TemplateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # This function only purpose is to create configuration + # as defined in input json file. + # + # Example + # + # Creating configuration defined in input JSON + # file, example, BGP config, interface config, static routes + # config, prefix list config + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + +def test_bgp_convergence(request): + " Test BGP daemon convergence " + + tgen = get_topogen() + global bgp_convergence + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + bgp_convergence = verify_bgp_convergence(tgen, topo) + assert bgp_convergence is True, "test_bgp_convergence failed.. \n"\ + " Error: {}".format(bgp_convergence) + + logger.info("BGP is converged successfully \n") + write_test_footer(tc_name) + + +def test_static_routes(request): + " Test to create and verify static routes. " + + tgen = get_topogen() + if bgp_convergence is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Static routes are created as part of initial configuration, + # verifying RIB + dut = 'r3' + next_hop = '10.0.0.1' + input_dict = { + "r1": { + "static_routes": [ + { + "network": "100.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.1" + } + ] + } + } + # Uncomment below to debug + # tgen.mininet_cli() + result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == '__main__': + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py new file mode 100644 index 0000000000..13f8824976 --- /dev/null +++ b/tests/topotests/lib/bgp.py @@ -0,0 +1,1521 @@ +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. +# ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +from copy import deepcopy +from time import sleep +import traceback +import ipaddr +from lib import topotest + +from lib.topolog import logger + +# Import common_config to use commomnly used APIs +from lib.common_config import (create_common_configuration, + InvalidCLIError, + load_config_to_router, + check_address_types, + generate_ips, + find_interface_with_greater_ip) + +BGP_CONVERGENCE_TIMEOUT = 10 + + +def create_router_bgp(tgen, topo, input_dict=None, build=False): + """ + API to configure bgp on router + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + input_dict = { + "r1": { + "bgp": { + "local_as": "200", + "router_id": "22.22.22.22", + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ], + "advertise_networks": [ + { + "network": "20.0.0.0/32", + "no_of_network": 10 + }, + { + "network": "30.0.0.0/32", + "no_of_network": 10 + } + ], + "neighbor": { + "r3": { + "keepalivetimer": 60, + "holddowntimer": 180, + "dest_link": { + "r4": { + "prefix_lists": [ + { + "name": "pf_list_1", + "direction": "in" + } + ], + "route_maps": [ + {"name": "RMAP_MED_R3", + "direction": "in"} + ], + "next_hop_self": True + } + } + } + } + } + } + } + } + } + } + + + Returns + ------- + True or False + """ + logger.debug("Entering lib API: create_router_bgp()") + result = False + if not input_dict: + input_dict = deepcopy(topo) + else: + topo = topo["routers"] + for router in input_dict.keys(): + if "bgp" not in input_dict[router]: + logger.debug("Router %s: 'bgp' not present in input_dict", router) + continue + + result = __create_bgp_global(tgen, input_dict, router, build) + if result is True: + bgp_data = input_dict[router]["bgp"] + + bgp_addr_data = bgp_data.setdefault("address_family", {}) + + if not bgp_addr_data: + logger.debug("Router %s: 'address_family' not present in " + "input_dict for BGP", router) + else: + + ipv4_data = bgp_addr_data.setdefault("ipv4", {}) + ipv6_data = bgp_addr_data.setdefault("ipv6", {}) + + neigh_unicast = True if ipv4_data.setdefault("unicast", {}) \ + or ipv6_data.setdefault("unicast", {}) else False + + if neigh_unicast: + result = __create_bgp_unicast_neighbor( + tgen, topo, input_dict, router, build) + + logger.debug("Exiting lib API: create_router_bgp()") + return result + + +def __create_bgp_global(tgen, input_dict, router, build=False): + """ + Helper API to create bgp global configuration. + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `router` : router id to be configured. + * `build` : Only for initial setup phase this is set as True. + + Returns + ------- + True or False + """ + + result = False + logger.debug("Entering lib API: __create_bgp_global()") + try: + + bgp_data = input_dict[router]["bgp"] + del_bgp_action = bgp_data.setdefault("delete", False) + if del_bgp_action: + config_data = ["no router bgp"] + result = create_common_configuration(tgen, router, config_data, + "bgp", build=build) + return result + + config_data = [] + + if "local_as" not in bgp_data and build: + logger.error("Router %s: 'local_as' not present in input_dict" + "for BGP", router) + return False + + local_as = bgp_data.setdefault("local_as", "") + cmd = "router bgp {}".format(local_as) + vrf_id = bgp_data.setdefault("vrf", None) + if vrf_id: + cmd = "{} vrf {}".format(cmd, vrf_id) + + config_data.append(cmd) + + router_id = bgp_data.setdefault("router_id", None) + del_router_id = bgp_data.setdefault("del_router_id", False) + if del_router_id: + config_data.append("no bgp router-id") + if router_id: + config_data.append("bgp router-id {}".format( + router_id)) + + aggregate_address = bgp_data.setdefault("aggregate_address", + {}) + if aggregate_address: + network = aggregate_address.setdefault("network", None) + if not network: + logger.error("Router %s: 'network' not present in " + "input_dict for BGP", router) + else: + cmd = "aggregate-address {}".format(network) + + as_set = aggregate_address.setdefault("as_set", False) + summary = aggregate_address.setdefault("summary", False) + del_action = aggregate_address.setdefault("delete", False) + if as_set: + cmd = "{} {}".format(cmd, "as-set") + if summary: + cmd = "{} {}".format(cmd, "summary") + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + result = create_common_configuration(tgen, router, config_data, + "bgp", build=build) + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_bgp_global()") + return result + + +def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, build=False): + """ + Helper API to create configuration for address-family unicast + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring from testcase + * `router` : router id to be configured. + * `build` : Only for initial setup phase this is set as True. + """ + + result = False + logger.debug("Entering lib API: __create_bgp_unicast_neighbor()") + try: + config_data = ["router bgp"] + bgp_data = input_dict[router]["bgp"]["address_family"] + + for addr_type, addr_dict in bgp_data.iteritems(): + if not addr_dict: + continue + + if not check_address_types(addr_type): + continue + + config_data.append("address-family {} unicast".format( + addr_type + )) + addr_data = addr_dict["unicast"] + advertise_network = addr_data.setdefault("advertise_networks", + []) + for advertise_network_dict in advertise_network: + network = advertise_network_dict["network"] + if type(network) is not list: + network = [network] + + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 1 + + del_action = advertise_network_dict.setdefault("delete", + False) + + # Generating IPs for verification + prefix = str( + ipaddr.IPNetwork(unicode(network[0])).prefixlen) + network_list = generate_ips(network, no_of_network) + for ip in network_list: + ip = str(ipaddr.IPNetwork(unicode(ip)).network) + + cmd = "network {}/{}\n".format(ip, prefix) + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + max_paths = addr_data.setdefault("maximum_paths", {}) + if max_paths: + ibgp = max_paths.setdefault("ibgp", None) + ebgp = max_paths.setdefault("ebgp", None) + if ibgp: + config_data.append("maximum-paths ibgp {}".format( + ibgp + )) + if ebgp: + config_data.append("maximum-paths {}".format( + ebgp + )) + + aggregate_address = addr_data.setdefault("aggregate_address", + {}) + if aggregate_address: + ip = aggregate_address("network", None) + attribute = aggregate_address("attribute", None) + if ip: + cmd = "aggregate-address {}".format(ip) + if attribute: + cmd = "{} {}".format(cmd, attribute) + + config_data.append(cmd) + + redistribute_data = addr_data.setdefault("redistribute", {}) + if redistribute_data: + for redistribute in redistribute_data: + if "redist_type" not in redistribute: + logger.error("Router %s: 'redist_type' not present in " + "input_dict", router) + else: + cmd = "redistribute {}".format( + redistribute["redist_type"]) + redist_attr = redistribute.setdefault("attribute", + None) + if redist_attr: + cmd = "{} {}".format(cmd, redist_attr) + del_action = redistribute.setdefault("delete", False) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "neighbor" in addr_data: + neigh_data = __create_bgp_neighbor(topo, input_dict, + router, addr_type) + config_data.extend(neigh_data) + + for addr_type, addr_dict in bgp_data.iteritems(): + if not addr_dict or not check_address_types(addr_type): + continue + + addr_data = addr_dict["unicast"] + if "neighbor" in addr_data: + neigh_addr_data = __create_bgp_unicast_address_family( + topo, input_dict, router, addr_type) + + config_data.extend(neigh_addr_data) + + result = create_common_configuration(tgen, router, config_data, + None, build=build) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") + return result + + +def __create_bgp_neighbor(topo, input_dict, router, addr_type): + """ + Helper API to create neighbor specific configuration + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring from testcase + * `router` : router id to be configured + """ + + config_data = [] + logger.debug("Entering lib API: __create_bgp_neighbor()") + + bgp_data = input_dict[router]["bgp"]["address_family"] + neigh_data = bgp_data[addr_type]["unicast"]["neighbor"] + + for name, peer_dict in neigh_data.iteritems(): + for dest_link, peer in peer_dict["dest_link"].iteritems(): + nh_details = topo[name] + remote_as = nh_details["bgp"]["local_as"] + update_source = None + + if dest_link in nh_details["links"].keys(): + ip_addr = \ + nh_details["links"][dest_link][addr_type].split("/")[0] + # Loopback interface + if "source_link" in peer and peer["source_link"] == "lo": + update_source = topo[router]["links"]["lo"][ + addr_type].split("/")[0] + + neigh_cxt = "neighbor {}".format(ip_addr) + + config_data.append("{} remote-as {}".format(neigh_cxt, remote_as)) + if addr_type == "ipv6": + config_data.append("address-family ipv6 unicast") + config_data.append("{} activate".format(neigh_cxt)) + + disable_connected = peer.setdefault("disable_connected_check", + False) + keep_alive = peer.setdefault("keep_alive", 60) + hold_down = peer.setdefault("hold_down", 180) + password = peer.setdefault("password", None) + max_hop_limit = peer.setdefault("ebgp_multihop", 1) + + if update_source: + config_data.append("{} update-source {}".format( + neigh_cxt, update_source)) + if disable_connected: + config_data.append("{} disable-connected-check".format( + disable_connected)) + if update_source: + config_data.append("{} update-source {}".format(neigh_cxt, + update_source)) + if int(keep_alive) != 60 and int(hold_down) != 180: + config_data.append( + "{} timers {} {}".format(neigh_cxt, keep_alive, + hold_down)) + if password: + config_data.append( + "{} password {}".format(neigh_cxt, password)) + + if max_hop_limit > 1: + config_data.append("{} ebgp-multihop {}".format(neigh_cxt, + max_hop_limit)) + config_data.append("{} enforce-multihop".format(neigh_cxt)) + + logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()") + return config_data + + +def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type): + """ + API prints bgp global config to bgp_json file. + + Parameters + ---------- + * `bgp_cfg` : BGP class variables have BGP config saved in it for + particular router, + * `local_as_no` : Local as number + * `router_id` : Router-id + * `ecmp_path` : ECMP max path + * `gr_enable` : BGP global gracefull restart config + """ + + config_data = [] + logger.debug("Entering lib API: __create_bgp_unicast_neighbor()") + + bgp_data = input_dict[router]["bgp"]["address_family"] + neigh_data = bgp_data[addr_type]["unicast"]["neighbor"] + + for name, peer_dict in deepcopy(neigh_data).iteritems(): + for dest_link, peer in peer_dict["dest_link"].iteritems(): + deactivate = None + nh_details = topo[name] + # Loopback interface + if "source_link" in peer and peer["source_link"] == "lo": + for destRouterLink, data in sorted(nh_details["links"]. + iteritems()): + if "type" in data and data["type"] == "loopback": + if dest_link == destRouterLink: + ip_addr = \ + nh_details["links"][destRouterLink][ + addr_type].split("/")[0] + + # Physical interface + else: + if dest_link in nh_details["links"].keys(): + + ip_addr = nh_details["links"][dest_link][ + addr_type].split("/")[0] + if addr_type == "ipv4" and bgp_data["ipv6"]: + deactivate = nh_details["links"][ + dest_link]["ipv6"].split("/")[0] + + neigh_cxt = "neighbor {}".format(ip_addr) + config_data.append("address-family {} unicast".format( + addr_type + )) + if deactivate: + config_data.append( + "no neighbor {} activate".format(deactivate)) + + next_hop_self = peer.setdefault("next_hop_self", None) + send_community = peer.setdefault("send_community", None) + prefix_lists = peer.setdefault("prefix_lists", {}) + route_maps = peer.setdefault("route_maps", {}) + + # next-hop-self + if next_hop_self: + config_data.append("{} next-hop-self".format(neigh_cxt)) + # no_send_community + if send_community: + config_data.append("{} send-community".format(neigh_cxt)) + + if prefix_lists: + for prefix_list in prefix_lists: + name = prefix_list.setdefault("name", {}) + direction = prefix_list.setdefault("direction", "in") + del_action = prefix_list.setdefault("delete", False) + if not name: + logger.info("Router %s: 'name' not present in " + "input_dict for BGP neighbor prefix lists", + router) + else: + cmd = "{} prefix-list {} {}".format(neigh_cxt, name, + direction) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if route_maps: + for route_map in route_maps: + name = route_map.setdefault("name", {}) + direction = route_map.setdefault("direction", "in") + del_action = route_map.setdefault("delete", False) + if not name: + logger.info("Router %s: 'name' not present in " + "input_dict for BGP neighbor route name", + router) + else: + cmd = "{} route-map {} {}".format(neigh_cxt, name, + direction) + if del_action: + cmd = "no {}".format(cmd) + config_data.append(cmd) + + return config_data + + +############################################# +# Verification APIs +############################################# +def verify_router_id(tgen, topo, input_dict): + """ + Running command "show ip bgp json" for DUT and reading router-id + from input_dict and verifying with command output. + 1. Statically modfified router-id should take place + 2. When static router-id is deleted highest loopback should + become router-id + 3. When loopback intf is down then highest physcial intf + should become router-id + + Parameters + ---------- + * `tgen`: topogen object + * `topo`: input json file data + * `input_dict`: input dictionary, have details of Device Under Test, for + which user wants to test the data + Usage + ----- + # Verify if router-id for r1 is 12.12.12.12 + input_dict = { + "r1":{ + "router_id": "12.12.12.12" + } + # Verify that router-id for r1 is highest interface ip + input_dict = { + "routers": ["r1"] + } + result = verify_router_id(tgen, topo, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_router_id()") + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + + del_router_id = input_dict[router]["bgp"].setdefault( + "del_router_id", False) + + logger.info("Checking router %s router-id", router) + show_bgp_json = rnode.vtysh_cmd("show ip bgp json", + isjson=True) + router_id_out = show_bgp_json["routerId"] + router_id_out = ipaddr.IPv4Address(unicode(router_id_out)) + + # Once router-id is deleted, highest interface ip should become + # router-id + if del_router_id: + router_id = find_interface_with_greater_ip(topo, router) + else: + router_id = input_dict[router]["bgp"]["router_id"] + router_id = ipaddr.IPv4Address(unicode(router_id)) + + if router_id == router_id_out: + logger.info("Found expected router-id %s for router %s", + router_id, router) + else: + errormsg = "Router-id for router:{} mismatch, expected:" \ + " {} but found:{}".format(router, router_id, + router_id_out) + return errormsg + + logger.info("Exiting lib API: verify_router_id()") + return True + + +def verify_bgp_convergence(tgen, topo): + """ + API will verify if BGP is converged with in the given time frame. + Running "show bgp summary json" command and verify bgp neighbor + state is established, + + Parameters + ---------- + * `tgen`: topogen object + * `topo`: input json file data + * `addr_type`: ip_type, ipv4/ipv6 + + Usage + ----- + # To veriry is BGP is converged for all the routers used in + topology + results = verify_bgp_convergence(tgen, topo, "ipv4") + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_bgp_confergence()") + for router, rnode in tgen.routers().iteritems(): + logger.info("Verifying BGP Convergence on router %s:", router) + + for retry in range(1, 11): + show_bgp_json = rnode.vtysh_cmd("show bgp summary json", + isjson=True) + # Verifying output dictionary show_bgp_json is empty or not + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + # To find neighbor ip type + total_peer = 0 + + bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] + for addr_type in bgp_addr_type.keys(): + if not check_address_types(addr_type): + continue + + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] + + for bgp_neighbor in bgp_neighbors: + total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) + + for addr_type in bgp_addr_type.keys(): + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] + + no_of_peer = 0 + for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): + for dest_link in peer_data["dest_link"].keys(): + data = topo["routers"][bgp_neighbor]["links"] + if dest_link in data: + neighbor_ip = \ + data[dest_link][addr_type].split("/")[0] + if addr_type == "ipv4": + ipv4_data = show_bgp_json["ipv4Unicast"][ + "peers"] + nh_state = ipv4_data[neighbor_ip]["state"] + else: + ipv6_data = show_bgp_json["ipv6Unicast"][ + "peers"] + nh_state = ipv6_data[neighbor_ip]["state"] + + if nh_state == "Established": + no_of_peer += 1 + if no_of_peer == total_peer: + logger.info("BGP is Converged for router %s", router) + break + else: + logger.warning("BGP is not yet Converged for router %s", + router) + sleeptime = 2 * retry + if sleeptime <= BGP_CONVERGENCE_TIMEOUT: + # Waiting for BGP to converge + logger.info("Waiting for %s sec for BGP to converge on" + " router %s...", sleeptime, router) + sleep(sleeptime) + else: + show_bgp_summary = rnode.vtysh_cmd("show bgp summary") + errormsg = "TIMEOUT!! BGP is not converged in {} " \ + "seconds for router {} \n {}".format( + BGP_CONVERGENCE_TIMEOUT, router, + show_bgp_summary) + return errormsg + + logger.info("Exiting API: verify_bgp_confergence()") + return True + + +def modify_as_number(tgen, topo, input_dict): + """ + API reads local_as and remote_as from user defined input_dict and + modify router"s ASNs accordingly. Router"s config is modified and + recent/changed config is loadeded to router. + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : defines for which router ASNs needs to be modified + + Usage + ----- + To modify ASNs for router r1 + input_dict = { + "r1": { + "bgp": { + "local_as": 131079 + } + } + result = modify_as_number(tgen, topo, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: modify_as_number()") + try: + + new_topo = deepcopy(topo["routers"]) + router_dict = {} + for router in input_dict.keys(): + # Remove bgp configuration + + router_dict.update({ + router: { + "bgp": { + "delete": True + } + } + }) + + new_topo[router]["bgp"]["local_as"] = \ + input_dict[router]["bgp"]["local_as"] + + logger.info("Removing bgp configuration") + create_router_bgp(tgen, topo, router_dict) + + logger.info("Applying modified bgp configuration") + create_router_bgp(tgen, new_topo) + + except Exception as e: + # handle any exception + logger.error("Error %s occured. Arguments %s.", e.message, e.args) + + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.info("Exiting lib API: modify_as_number()") + + return True + + +def verify_as_numbers(tgen, topo, input_dict): + """ + This API is to verify AS numbers for given DUT by running + "show ip bgp neighbor json" command. Local AS and Remote AS + will ve verified with input_dict data and command output. + + Parameters + ---------- + * `tgen`: topogen object + * `topo`: input json file data + * `addr_type` : ip type, ipv4/ipv6 + * `input_dict`: defines - for which router, AS numbers needs to be verified + + Usage + ----- + input_dict = { + "r1": { + "bgp": { + "local_as": 131079 + } + } + } + result = verify_as_numbers(tgen, topo, addr_type, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_as_numbers()") + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + + logger.info("Verifying AS numbers for dut %s:", router) + + show_ip_bgp_neighbor_json = rnode.vtysh_cmd( + "show ip bgp neighbor json", isjson=True) + local_as = input_dict[router]["bgp"]["local_as"] + bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] + + for addr_type in bgp_addr_type: + if not check_address_types(addr_type): + continue + + bgp_neighbors = bgp_addr_type[addr_type]["unicast"][ + "neighbor"] + + for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): + remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"] + for dest_link, peer_dict in peer_data["dest_link"].iteritems(): + neighbor_ip = None + data = topo["routers"][bgp_neighbor]["links"] + + if dest_link in data: + neighbor_ip = data[dest_link][addr_type]. \ + split("/")[0] + neigh_data = show_ip_bgp_neighbor_json[neighbor_ip] + # Verify Local AS for router + if neigh_data["localAs"] != local_as: + errormsg = "Failed: Verify local_as for dut {}," \ + " found: {} but expected: {}".format( + router, neigh_data["localAs"], + local_as) + return errormsg + else: + logger.info("Verified local_as for dut %s, found" + " expected: %s", router, local_as) + + # Verify Remote AS for neighbor + if neigh_data["remoteAs"] != remote_as: + errormsg = "Failed: Verify remote_as for dut " \ + "{}'s neighbor {}, found: {} but " \ + "expected: {}".format( + router, bgp_neighbor, + neigh_data["remoteAs"], remote_as) + return errormsg + else: + logger.info("Verified remote_as for dut %s's " + "neighbor %s, found expected: %s", + router, bgp_neighbor, remote_as) + + logger.info("Exiting lib API: verify_AS_numbers()") + return True + + +def clear_bgp_and_verify(tgen, topo, router): + """ + This API is to clear bgp neighborship and verify bgp neighborship + is coming up(BGP is converged) usinf "show bgp summary json" command + and also verifying for all bgp neighbors uptime before and after + clear bgp sessions is different as the uptime must be changed once + bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd. + + Parameters + ---------- + * `tgen`: topogen object + * `topo`: input json file data + * `router`: device under test + + Usage + ----- + result = clear_bgp_and_verify(tgen, topo, addr_type, dut) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: clear_bgp_and_verify()") + + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + peer_uptime_before_clear_bgp = {} + # Verifying BGP convergence before bgp clear command + for retry in range(1, 11): + sleeptime = 2 * retry + if sleeptime <= BGP_CONVERGENCE_TIMEOUT: + # Waiting for BGP to converge + logger.info("Waiting for %s sec for BGP to converge on router" + " %s...", sleeptime, router) + sleep(sleeptime) + else: + errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \ + " router {}".format(BGP_CONVERGENCE_TIMEOUT, router) + return errormsg + + show_bgp_json = rnode.vtysh_cmd("show bgp summary json", + isjson=True) + logger.info(show_bgp_json) + # Verifying output dictionary show_bgp_json is empty or not + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + # To find neighbor ip type + bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] + total_peer = 0 + for addr_type in bgp_addr_type.keys(): + + if not check_address_types(addr_type): + continue + + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] + + for bgp_neighbor in bgp_neighbors: + total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) + + no_of_peer = 0 + for addr_type in bgp_addr_type: + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] + + for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): + for dest_link, peer_dict in peer_data["dest_link"].iteritems(): + data = topo["routers"][bgp_neighbor]["links"] + + if dest_link in data: + neighbor_ip = data[dest_link][addr_type].split("/")[0] + if addr_type == "ipv4": + ipv4_data = show_bgp_json["ipv4Unicast"][ + "peers"] + nh_state = ipv4_data[neighbor_ip]["state"] + + # Peer up time dictionary + peer_uptime_before_clear_bgp[bgp_neighbor] = \ + ipv4_data[neighbor_ip]["peerUptime"] + else: + ipv6_data = show_bgp_json["ipv6Unicast"][ + "peers"] + nh_state = ipv6_data[neighbor_ip]["state"] + + # Peer up time dictionary + peer_uptime_before_clear_bgp[bgp_neighbor] = \ + ipv6_data[neighbor_ip]["peerUptime"] + + if nh_state == "Established": + no_of_peer += 1 + + if no_of_peer == total_peer: + logger.info("BGP is Converged for router %s before bgp" + " clear", router) + break + else: + logger.warning("BGP is not yet Converged for router %s " + "before bgp clear", router) + + # Clearing BGP + logger.info("Clearing BGP neighborship for router %s..", router) + for addr_type in bgp_addr_type.keys(): + if addr_type == "ipv4": + rnode.vtysh_cmd("clear ip bgp *") + elif addr_type == "ipv6": + rnode.vtysh_cmd("clear bgp ipv6 *") + + peer_uptime_after_clear_bgp = {} + # Verifying BGP convergence after bgp clear command + for retry in range(1, 11): + sleeptime = 2 * retry + if sleeptime <= BGP_CONVERGENCE_TIMEOUT: + # Waiting for BGP to converge + logger.info("Waiting for %s sec for BGP to converge on router" + " %s...", sleeptime, router) + sleep(sleeptime) + else: + errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \ + " router {}".format(BGP_CONVERGENCE_TIMEOUT, router) + return errormsg + + show_bgp_json = rnode.vtysh_cmd("show bgp summary json", + isjson=True) + # Verifying output dictionary show_bgp_json is empty or not + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + # To find neighbor ip type + bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] + total_peer = 0 + for addr_type in bgp_addr_type.keys(): + if not check_address_types(addr_type): + continue + + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] + + for bgp_neighbor in bgp_neighbors: + total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) + + no_of_peer = 0 + for addr_type in bgp_addr_type: + bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] + + for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): + for dest_link, peer_dict in peer_data["dest_link"].iteritems(): + data = topo["routers"][bgp_neighbor]["links"] + + if dest_link in data: + neighbor_ip = data[dest_link][addr_type].\ + split("/")[0] + if addr_type == "ipv4": + ipv4_data = show_bgp_json["ipv4Unicast"][ + "peers"] + nh_state = ipv4_data[neighbor_ip]["state"] + peer_uptime_after_clear_bgp[bgp_neighbor] = \ + ipv4_data[neighbor_ip]["peerUptime"] + else: + ipv6_data = show_bgp_json["ipv6Unicast"][ + "peers"] + nh_state = ipv6_data[neighbor_ip]["state"] + # Peer up time dictionary + peer_uptime_after_clear_bgp[bgp_neighbor] = \ + ipv6_data[neighbor_ip]["peerUptime"] + + if nh_state == "Established": + no_of_peer += 1 + + if no_of_peer == total_peer: + logger.info("BGP is Converged for router %s after bgp clear", + router) + break + else: + logger.warning("BGP is not yet Converged for router %s after" + " bgp clear", router) + + # Compariung peerUptime dictionaries + if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp: + logger.info("BGP neighborship is reset after clear BGP on router %s", + router) + else: + errormsg = "BGP neighborship is not reset after clear bgp on router" \ + " {}".format(router) + return errormsg + + logger.info("Exiting lib API: clear_bgp_and_verify()") + return True + + +def verify_bgp_timers_and_functionality(tgen, topo, input_dict): + """ + To verify BGP timer config, execute "show ip bgp neighbor json" command + and verify bgp timers with input_dict data. + To veirfy bgp timers functonality, shutting down peer interface + and verify BGP neighborship status. + + Parameters + ---------- + * `tgen`: topogen object + * `topo`: input json file data + * `addr_type`: ip type, ipv4/ipv6 + * `input_dict`: defines for which router, bgp timers needs to be verified + + Usage: + # To verify BGP timers for neighbor r2 of router r1 + input_dict = { + "r1": { + "bgp": { + "bgp_neighbors":{ + "r2":{ + "keepalivetimer": 5, + "holddowntimer": 15, + }}}}} + result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4", + input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_bgp_timers_and_functionality()") + sleep(5) + router_list = tgen.routers() + for router in input_dict.keys(): + if router not in router_list: + continue + + rnode = router_list[router] + + logger.info("Verifying bgp timers functionality, DUT is %s:", + router) + + show_ip_bgp_neighbor_json = \ + rnode.vtysh_cmd("show ip bgp neighbor json", isjson=True) + + bgp_addr_type = input_dict[router]["bgp"]["address_family"] + + for addr_type in bgp_addr_type: + if not check_address_types(addr_type): + continue + + bgp_neighbors = bgp_addr_type[addr_type]["unicast"][ + "neighbor"] + for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): + for dest_link, peer_dict in peer_data["dest_link"].iteritems(): + data = topo["routers"][bgp_neighbor]["links"] + + keepalivetimer = peer_dict["keepalivetimer"] + holddowntimer = peer_dict["holddowntimer"] + + if dest_link in data: + neighbor_ip = data[dest_link][addr_type]. \ + split("/")[0] + neighbor_intf = data[dest_link]["interface"] + + # Verify HoldDownTimer for neighbor + bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[ + neighbor_ip]["bgpTimerHoldTimeMsecs"] + if bgpHoldTimeMsecs != holddowntimer * 1000: + errormsg = "Verifying holddowntimer for bgp " \ + "neighbor {} under dut {}, found: {} " \ + "but expected: {}".format( + neighbor_ip, router, + bgpHoldTimeMsecs, + holddowntimer * 1000) + return errormsg + + # Verify KeepAliveTimer for neighbor + bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[ + neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"] + if bgpKeepAliveTimeMsecs != keepalivetimer * 1000: + errormsg = "Verifying keepalivetimer for bgp " \ + "neighbor {} under dut {}, found: {} " \ + "but expected: {}".format( + neighbor_ip, router, + bgpKeepAliveTimeMsecs, + keepalivetimer * 1000) + return errormsg + + #################### + # Shutting down peer interface after keepalive time and + # after some time bringing up peer interface. + # verifying BGP neighborship in (hold down-keep alive) + # time, it should not go down + #################### + + # Wait till keep alive time + logger.info("=" * 20) + logger.info("Scenario 1:") + logger.info("Shutdown and bring up peer interface: %s " + "in keep alive time : %s sec and verify " + " BGP neighborship is intact in %s sec ", + neighbor_intf, keepalivetimer, + (holddowntimer - keepalivetimer)) + logger.info("=" * 20) + logger.info("Waiting for %s sec..", keepalivetimer) + sleep(keepalivetimer) + + # Shutting down peer ineterface + logger.info("Shutting down interface %s on router %s", + neighbor_intf, bgp_neighbor) + topotest.interface_set_status( + router_list[bgp_neighbor], neighbor_intf, + ifaceaction=False) + + # Bringing up peer interface + sleep(5) + logger.info("Bringing up interface %s on router %s..", + neighbor_intf, bgp_neighbor) + topotest.interface_set_status( + router_list[bgp_neighbor], neighbor_intf, + ifaceaction=True) + + # Verifying BGP neighborship is intact in + # (holddown - keepalive) time + for timer in range(keepalivetimer, holddowntimer, + int(holddowntimer / 3)): + logger.info("Waiting for %s sec..", keepalivetimer) + sleep(keepalivetimer) + sleep(2) + show_bgp_json = \ + rnode.vtysh_cmd("show bgp summary json", + isjson=True) + + if addr_type == "ipv4": + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] + nh_state = ipv4_data[neighbor_ip]["state"] + else: + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] + nh_state = ipv6_data[neighbor_ip]["state"] + + if timer == \ + (holddowntimer - keepalivetimer): + if nh_state != "Established": + errormsg = "BGP neighborship has not gone " \ + "down in {} sec for neighbor {}\n" \ + "show_bgp_json: \n {} ".format( + timer, bgp_neighbor, + show_bgp_json) + return errormsg + else: + logger.info("BGP neighborship is intact in %s" + " sec for neighbor %s \n " + "show_bgp_json : \n %s", + timer, bgp_neighbor, + show_bgp_json) + + #################### + # Shutting down peer interface and verifying that BGP + # neighborship is going down in holddown time + #################### + logger.info("=" * 20) + logger.info("Scenario 2:") + logger.info("Shutdown peer interface: %s and verify BGP" + " neighborship has gone down in hold down " + "time %s sec", neighbor_intf, holddowntimer) + logger.info("=" * 20) + + logger.info("Shutting down interface %s on router %s..", + neighbor_intf, bgp_neighbor) + topotest.interface_set_status(router_list[bgp_neighbor], + neighbor_intf, + ifaceaction=False) + + # Verifying BGP neighborship is going down in holddown time + for timer in range(keepalivetimer, + (holddowntimer + keepalivetimer), + int(holddowntimer / 3)): + logger.info("Waiting for %s sec..", keepalivetimer) + sleep(keepalivetimer) + sleep(2) + show_bgp_json = \ + rnode.vtysh_cmd("show bgp summary json", + isjson=True) + + if addr_type == "ipv4": + ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] + nh_state = ipv4_data[neighbor_ip]["state"] + else: + ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] + nh_state = ipv6_data[neighbor_ip]["state"] + + if timer == holddowntimer: + if nh_state == "Established": + errormsg = "BGP neighborship has not gone " \ + "down in {} sec for neighbor {}\n" \ + "show_bgp_json: \n {} ".format( + timer, bgp_neighbor, + show_bgp_json) + return errormsg + else: + logger.info("BGP neighborship has gone down in" + " %s sec for neighbor %s \n" + "show_bgp_json : \n %s", + timer, bgp_neighbor, + show_bgp_json) + + logger.info("Exiting lib API: verify_bgp_timers_and_functionality()") + return True + + +def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict, + attribute): + """ + API is to verify best path according to BGP attributes for given routes. + "show bgp ipv4/6 json" command will be run and verify best path according + to shortest as-path, highest local-preference and med, lowest weight and + route origin IGP>EGP>INCOMPLETE. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `tgen` : topogen object + * `attribute` : calculate best path using this attribute + * `input_dict`: defines different routes to calculate for which route + best path is selected + + Usage + ----- + # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from + router r7 to router r1(DUT) as per shortest as-path attribute + input_dict = { + "r7": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "200.50.2.0/32" + }, + { + "network": "200.60.2.0/32" + } + ] + } + } + } + } + } + } + attribute = "localpref" + result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \ + input_dict, attribute) + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()") + if router not in tgen.routers(): + return False + + rnode = tgen.routers()[router] + + # TODO get addr_type from address + # Verifying show bgp json + command = "show bgp {} json".format(addr_type) + + sleep(2) + logger.info("Verifying router %s RIB for best path:", router) + sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True) + + for route_val in input_dict.values(): + net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"] + networks = net_data["advertise_networks"] + for network in networks: + route = network["network"] + + route_attributes = sh_ip_bgp_json["routes"][route] + _next_hop = None + compare = None + attribute_dict = {} + for route_attribute in route_attributes: + next_hops = route_attribute["nexthops"] + for next_hop in next_hops: + next_hop_ip = next_hop["ip"] + attribute_dict[next_hop_ip] = route_attribute[attribute] + + # AS_PATH attribute + if attribute == "aspath": + # Find next_hop for the route have minimum as_path + _next_hop = min(attribute_dict, key=lambda x: len(set( + attribute_dict[x]))) + compare = "SHORTEST" + + # LOCAL_PREF attribute + elif attribute == "localpref": + # Find next_hop for the route have highest local preference + _next_hop = max(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "HIGHEST" + + # WEIGHT attribute + elif attribute == "weight": + # Find next_hop for the route have highest weight + _next_hop = max(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "HIGHEST" + + # ORIGIN attribute + elif attribute == "origin": + # Find next_hop for the route have IGP as origin, - + # - rule is IGP>EGP>INCOMPLETE + _next_hop = [key for (key, value) in + attribute_dict.iteritems() + if value == "IGP"][0] + compare = "" + + # MED attribute + elif attribute == "med": + # Find next_hop for the route have LOWEST MED + _next_hop = min(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "LOWEST" + + # Show ip route + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if not bool(rib_routes_json): + errormsg = "No route found in RIB of router {}..". \ + format(router) + return errormsg + + st_found = False + nh_found = False + # Find best is installed in RIB + if route in rib_routes_json: + st_found = True + # Verify next_hop in rib_routes_json + if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ + _next_hop: + nh_found = True + else: + errormsg = "Incorrect Nexthop for BGP route {} in " \ + "RIB of router {}, Expected: {}, Found:" \ + " {}\n".format(route, router, + rib_routes_json[route][0][ + "nexthops"][0]["ip"], + _next_hop) + return errormsg + + if st_found and nh_found: + logger.info( + "Best path for prefix: %s with next_hop: %s is " + "installed according to %s %s: (%s) in RIB of " + "router %s", route, _next_hop, compare, + attribute, attribute_dict[_next_hop], router) + + logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()") + return True + + +def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict, + attribute): + """ + API is to verify best path according to admin distance for given + route. "show ip/ipv6 route json" command will be run and verify + best path accoring to shortest admin distanc. + + Parameters + ---------- + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test + * `tgen` : topogen object + * `attribute` : calculate best path using admin distance + * `input_dict`: defines different routes with different admin distance + to calculate for which route best path is selected + Usage + ----- + # To verify best path for route 200.50.2.0/32 from router r2 to + router r1(DUT) as per shortest admin distance which is 60. + input_dict = { + "r2": { + "static_routes": [{"network": "200.50.2.0/32", \ + "admin_distance": 80, "next_hop": "10.0.0.14"}, + {"network": "200.50.2.0/32", \ + "admin_distance": 60, "next_hop": "10.0.0.18"}] + }} + attribute = "localpref" + result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \ + input_dict, attribute): + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_best_path_as_per_admin_distance()") + router_list = tgen.routers() + if router not in router_list: + return False + + rnode = tgen.routers()[router] + + sleep(2) + logger.info("Verifying router %s RIB for best path:", router) + + # Show ip route cmd + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + + for routes_from_router in input_dict.keys(): + sh_ip_route_json = router_list[routes_from_router].vtysh_cmd( + command, isjson=True) + networks = input_dict[routes_from_router]["static_routes"] + for network in networks: + route = network["network"] + + route_attributes = sh_ip_route_json[route] + _next_hop = None + compare = None + attribute_dict = {} + for route_attribute in route_attributes: + next_hops = route_attribute["nexthops"] + for next_hop in next_hops: + next_hop_ip = next_hop["ip"] + attribute_dict[next_hop_ip] = route_attribute["distance"] + + # Find next_hop for the route have LOWEST Admin Distance + _next_hop = min(attribute_dict, key=(lambda k: + attribute_dict[k])) + compare = "LOWEST" + + # Show ip route + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if not bool(rib_routes_json): + errormsg = "No route found in RIB of router {}..".format(router) + return errormsg + + st_found = False + nh_found = False + # Find best is installed in RIB + if route in rib_routes_json: + st_found = True + # Verify next_hop in rib_routes_json + if rib_routes_json[route][0]["nexthops"][0]["ip"] == \ + _next_hop: + nh_found = True + else: + errormsg = ("Nexthop {} is Missing for BGP route {}" + " in RIB of router {}\n".format(_next_hop, + route, router)) + return errormsg + + if st_found and nh_found: + logger.info("Best path for prefix: %s is installed according" + " to %s %s: (%s) in RIB of router %s", route, + compare, attribute, + attribute_dict[_next_hop], router) + + logger.info( + "Exiting lib API: verify_best_path_as_per_admin_distance()") + return True diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py new file mode 100644 index 0000000000..d2c1d82430 --- /dev/null +++ b/tests/topotests/lib/common_config.py @@ -0,0 +1,1391 @@ +# +# Copyright (c) 2019 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. +# ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +from collections import OrderedDict +from datetime import datetime +from time import sleep +from subprocess import call +from subprocess import STDOUT as SUB_STDOUT +import StringIO +import os +import ConfigParser +import traceback +import socket +import ipaddr + +from lib.topolog import logger, logger_config +from lib.topogen import TopoRouter + + +FRRCFG_FILE = "frr_json.conf" +FRRCFG_BKUP_FILE = "frr_json_initial.conf" + +ERROR_LIST = ["Malformed", "Failure", "Unknown"] + +#### +CD = os.path.dirname(os.path.realpath(__file__)) +PYTESTINI_PATH = os.path.join(CD, "../pytest.ini") + +# Creating tmp dir with testsuite name to avoid conflict condition when +# multiple testsuites run together. All temporary files would be created +# in this dir and this dir would be removed once testsuite run is +# completed +LOGDIR = "/tmp/topotests/" +TMPDIR = None + +# NOTE: to save execution logs to log file frrtest_log_dir must be configured +# in `pytest.ini`. +config = ConfigParser.ConfigParser() +config.read(PYTESTINI_PATH) + +config_section = "topogen" + +if config.has_option("topogen", "verbosity"): + loglevel = config.get("topogen", "verbosity") + loglevel = loglevel.upper() +else: + loglevel = "INFO" + +if config.has_option("topogen", "frrtest_log_dir"): + frrtest_log_dir = config.get("topogen", "frrtest_log_dir") + time_stamp = datetime.time(datetime.now()) + logfile_name = "frr_test_bgp_" + frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp) + print("frrtest_log_file..", frrtest_log_file) + + logger = logger_config.get_logger(name="test_execution_logs", + log_level=loglevel, + target=frrtest_log_file) + print("Logs will be sent to logfile: {}".format(frrtest_log_file)) + +if config.has_option("topogen", "show_router_config"): + show_router_config = config.get("topogen", "show_router_config") +else: + show_router_config = False + +# env variable for setting what address type to test +ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES") + + +# Saves sequence id numbers +SEQ_ID = { + "prefix_lists": {}, + "route_maps": {} +} + + +def get_seq_id(obj_type, router, obj_name): + """ + Generates and saves sequence number in interval of 10 + + Parameters + ---------- + * `obj_type`: prefix_lists or route_maps + * `router`: router name + *` obj_name`: name of the prefix-list or route-map + + Returns + -------- + Sequence number generated + """ + + router_data = SEQ_ID[obj_type].setdefault(router, {}) + obj_data = router_data.setdefault(obj_name, {}) + seq_id = obj_data.setdefault("seq_id", 0) + + seq_id = int(seq_id) + 10 + obj_data["seq_id"] = seq_id + + return seq_id + + +def set_seq_id(obj_type, router, id, obj_name): + """ + Saves sequence number if not auto-generated and given by user + + Parameters + ---------- + * `obj_type`: prefix_lists or route_maps + * `router`: router name + *` obj_name`: name of the prefix-list or route-map + """ + router_data = SEQ_ID[obj_type].setdefault(router, {}) + obj_data = router_data.setdefault(obj_name, {}) + seq_id = obj_data.setdefault("seq_id", 0) + + seq_id = int(seq_id) + int(id) + obj_data["seq_id"] = seq_id + + +class InvalidCLIError(Exception): + """Raise when the CLI command is wrong""" + pass + + +def create_common_configuration(tgen, router, data, config_type=None, + build=False): + """ + API to create object of class FRRConfig and also create frr_json.conf + file. It will create interface and common configurations and save it to + frr_json.conf and load to router + + Parameters + ---------- + * `tgen`: tgen onject + * `data`: Congiguration data saved in a list. + * `router` : router id to be configured. + * `config_type` : Syntactic information while writing configuration. Should + be one of the value as mentioned in the config_map below. + * `build` : Only for initial setup phase this is set as True + + Returns + ------- + True or False + """ + TMPDIR = os.path.join(LOGDIR, tgen.modname) + + fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE) + + config_map = OrderedDict({ + "general_config": "! FRR General Config\n", + "interface_config": "! Interfaces Config\n", + "static_route": "! Static Route Config\n", + "prefix_list": "! Prefix List Config\n", + "route_maps": "! Route Maps Config\n", + "bgp": "! BGP Config\n" + }) + + if build: + mode = "a" + else: + mode = "w" + + try: + frr_cfg_fd = open(fname, mode) + if config_type: + frr_cfg_fd.write(config_map[config_type]) + for line in data: + frr_cfg_fd.write("{} \n".format(str(line))) + + except IOError as err: + logger.error("Unable to open FRR Config File. error(%s): %s" % + (err.errno, err.strerror)) + return False + finally: + frr_cfg_fd.close() + + # If configuration applied from build, it will done at last + if not build: + load_config_to_router(tgen, router) + + return True + + +def reset_config_on_routers(tgen, routerName=None): + """ + Resets configuration on routers to the snapshot created using input JSON + file. It replaces existing router configuration with FRRCFG_BKUP_FILE + + Parameters + ---------- + * `tgen` : Topogen object + * `routerName` : router config is to be reset + """ + + logger.debug("Entering API: reset_config_on_routers") + + router_list = tgen.routers() + for rname, router in router_list.iteritems(): + if routerName and routerName != rname: + continue + + cfg = router.run("vtysh -c 'show running'") + fname = "{}/{}/frr.sav".format(TMPDIR, rname) + dname = "{}/{}/delta.conf".format(TMPDIR, rname) + f = open(fname, "w") + for line in cfg.split("\n"): + line = line.strip() + + if (line == "Building configuration..." or + line == "Current configuration:" or + not line): + continue + f.write(line) + f.write("\n") + + f.close() + + command = "/usr/lib/frr/frr-reload.py --input {}/{}/frr.sav" \ + " --test {}/{}/frr_json_initial.conf > {}". \ + format(TMPDIR, rname, TMPDIR, rname, dname) + result = call(command, shell=True, stderr=SUB_STDOUT) + + # Assert if command fail + if result > 0: + errormsg = ("Command:{} is failed due to non-zero exit" + " code".format(command)) + return errormsg + + f = open(dname, "r") + delta = StringIO.StringIO() + delta.write("configure terminal\n") + t_delta = f.read() + for line in t_delta.split("\n"): + line = line.strip() + if (line == "Lines To Delete" or + line == "===============" or + line == "Lines To Add" or + line == "============" or + not line): + continue + delta.write(line) + delta.write("\n") + + delta.write("end\n") + output = router.vtysh_multicmd(delta.getvalue(), + pretty_output=False) + logger.info("New configuration for router {}:".format(rname)) + delta.close() + delta = StringIO.StringIO() + cfg = router.run("vtysh -c 'show running'") + for line in cfg.split("\n"): + line = line.strip() + delta.write(line) + delta.write("\n") + + # Router current configuration to log file or console if + # "show_router_config" is defined in "pytest.ini" + if show_router_config: + logger.info(delta.getvalue()) + delta.close() + + logger.debug("Exting API: reset_config_on_routers") + return True + + +def load_config_to_router(tgen, routerName, save_bkup=False): + """ + Loads configuration on router from the file FRRCFG_FILE. + + Parameters + ---------- + * `tgen` : Topogen object + * `routerName` : router for which configuration to be loaded + * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE + """ + + logger.debug("Entering API: load_config_to_router") + + router_list = tgen.routers() + for rname, router in router_list.iteritems(): + if rname == routerName: + try: + frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE) + frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, + FRRCFG_BKUP_FILE) + with open(frr_cfg_file, "r") as cfg: + data = cfg.read() + if save_bkup: + with open(frr_cfg_bkup, "w") as bkup: + bkup.write(data) + + output = router.vtysh_multicmd(data, pretty_output=False) + for out_err in ERROR_LIST: + if out_err.lower() in output.lower(): + raise InvalidCLIError("%s" % output) + except IOError as err: + errormsg = ("Unable to open config File. error(%s):" + " %s", (err.errno, err.strerror)) + return errormsg + + logger.info("New configuration for router {}:".format(rname)) + new_config = router.run("vtysh -c 'show running'") + + # Router current configuration to log file or console if + # "show_router_config" is defined in "pytest.ini" + if show_router_config: + logger.info(new_config) + + logger.debug("Exting API: load_config_to_router") + return True + + +def start_topology(tgen): + """ + Starting topology, create tmp files which are loaded to routers + to start deamons and then start routers + * `tgen` : topogen object + """ + + global TMPDIR + # Starting topology + tgen.start_topology() + + # Starting deamons + router_list = tgen.routers() + TMPDIR = os.path.join(LOGDIR, tgen.modname) + + for rname, router in router_list.iteritems(): + try: + os.chdir(TMPDIR) + + # Creating rouer named dir and empty zebra.conf bgpd.conf files + # inside the current directory + + if os.path.isdir('{}'.format(rname)): + os.system("rm -rf {}".format(rname)) + os.mkdir('{}'.format(rname)) + os.system('chmod -R go+rw {}'.format(rname)) + os.chdir('{}/{}'.format(TMPDIR, rname)) + os.system('touch zebra.conf bgpd.conf') + else: + os.mkdir('{}'.format(rname)) + os.system('chmod -R go+rw {}'.format(rname)) + os.chdir('{}/{}'.format(TMPDIR, rname)) + os.system('touch zebra.conf bgpd.conf') + + except IOError as (errno, strerror): + logger.error("I/O error({0}): {1}".format(errno, strerror)) + + # Loading empty zebra.conf file to router, to start the zebra deamon + router.load_config( + TopoRouter.RD_ZEBRA, + '{}/{}/zebra.conf'.format(TMPDIR, rname) + # os.path.join(tmpdir, '{}/zebra.conf'.format(rname)) + ) + # Loading empty bgpd.conf file to router, to start the bgp deamon + router.load_config( + TopoRouter.RD_BGP, + '{}/{}/bgpd.conf'.format(TMPDIR, rname) + # os.path.join(tmpdir, '{}/bgpd.conf'.format(rname)) + ) + + # Starting routers + logger.info("Starting all routers once topology is created") + tgen.start_router() + + +def number_to_row(routerName): + """ + Returns the number for the router. + Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23 + etc + """ + return int(routerName[1:]) + + +def number_to_column(routerName): + """ + Returns the number for the router. + Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1, + z23 = column 26 etc + """ + return ord(routerName[0]) - 97 + + +############################################# +# Common APIs, will be used by all protocols +############################################# + +def validate_ip_address(ip_address): + """ + Validates the type of ip address + + Parameters + ---------- + * `ip_address`: IPv4/IPv6 address + + Returns + ------- + Type of address as string + """ + + if "/" in ip_address: + ip_address = ip_address.split("/")[0] + + v4 = True + v6 = True + try: + socket.inet_aton(ip_address) + except socket.error as error: + logger.debug("Not a valid IPv4 address") + v4 = False + else: + return "ipv4" + + try: + socket.inet_pton(socket.AF_INET6, ip_address) + except socket.error as error: + logger.debug("Not a valid IPv6 address") + v6 = False + else: + return "ipv6" + + if not v4 and not v6: + raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6" + " address" % ip_address) + + +def check_address_types(addr_type): + """ + Checks environment variable set and compares with the current address type + """ + global ADDRESS_TYPES + if ADDRESS_TYPES is None: + ADDRESS_TYPES = "dual" + + if ADDRESS_TYPES == "dual": + ADDRESS_TYPES = ["ipv4", "ipv6"] + elif ADDRESS_TYPES == "ipv4": + ADDRESS_TYPES = ["ipv4"] + elif ADDRESS_TYPES == "ipv6": + ADDRESS_TYPES = ["ipv6"] + + if addr_type not in ADDRESS_TYPES: + logger.error("{} not in supported/configured address types {}". + format(addr_type, ADDRESS_TYPES)) + return False + + return ADDRESS_TYPES + + +def generate_ips(network, no_of_ips): + """ + Returns list of IPs. + based on start_ip and no_of_ips + + * `network` : from here the ip will start generating, start_ip will be + first ip + * `no_of_ips` : these many IPs will be generated + + Limitation: It will generate IPs only for ip_mask 32 + + """ + ipaddress_list = [] + if type(network) is not list: + network = [network] + + for start_ipaddr in network: + if "/" in start_ipaddr: + start_ip = start_ipaddr.split("/")[0] + mask = int(start_ipaddr.split("/")[1]) + + addr_type = validate_ip_address(start_ip) + if addr_type == "ipv4": + start_ip = ipaddr.IPv4Address(unicode(start_ip)) + step = 2 ** (32 - mask) + if addr_type == "ipv6": + start_ip = ipaddr.IPv6Address(unicode(start_ip)) + step = 2 ** (128 - mask) + + next_ip = start_ip + count = 0 + while count < no_of_ips: + ipaddress_list.append("{}/{}".format(next_ip, mask)) + if addr_type == "ipv6": + next_ip = ipaddr.IPv6Address(int(next_ip) + step) + else: + next_ip += step + count += 1 + + return ipaddress_list + + +def find_interface_with_greater_ip(topo, router, loopback=True, + interface=True): + """ + Returns highest interface ip for ipv4/ipv6. If loopback is there then + it will return highest IP from loopback IPs otherwise from physical + interface IPs. + + * `topo` : json file data + * `router` : router for which hightest interface should be calculated + """ + + link_data = topo["routers"][router]["links"] + lo_list = [] + interfaces_list = [] + lo_exists = False + for destRouterLink, data in sorted(link_data.iteritems()): + if loopback: + if "type" in data and data["type"] == "loopback": + lo_exists = True + ip_address = topo["routers"][router]["links"][ + destRouterLink]["ipv4"].split("/")[0] + lo_list.append(ip_address) + if interface: + ip_address = topo["routers"][router]["links"][ + destRouterLink]["ipv4"].split("/")[0] + interfaces_list.append(ip_address) + + if lo_exists: + return sorted(lo_list)[-1] + + return sorted(interfaces_list)[-1] + + +def write_test_header(tc_name): + """ Display message at beginning of test case""" + count = 20 + logger.info("*"*(len(tc_name)+count)) + logger.info("START -> Testcase : %s", tc_name) + logger.info("*"*(len(tc_name)+count)) + + +def write_test_footer(tc_name): + """ Display message at end of test case""" + count = 21 + logger.info("="*(len(tc_name)+count)) + logger.info("PASSED -> Testcase : %s", tc_name) + logger.info("="*(len(tc_name)+count)) + + +############################################# +# These APIs, will used by testcase +############################################# +def create_interfaces_cfg(tgen, topo, build=False): + """ + Create interface configuration for created topology. Basic Interface + configuration is provided in input json file. + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `build` : Only for initial setup phase this is set as True. + + Returns + ------- + True or False + """ + result = False + + try: + for c_router, c_data in topo.iteritems(): + interface_data = [] + for destRouterLink, data in sorted(c_data["links"].iteritems()): + # Loopback interfaces + if "type" in data and data["type"] == "loopback": + interface_name = destRouterLink + else: + interface_name = data["interface"] + interface_data.append("interface {}\n".format( + str(interface_name) + )) + if "ipv4" in data: + intf_addr = c_data["links"][destRouterLink]["ipv4"] + interface_data.append("ip address {}\n".format( + intf_addr + )) + if "ipv6" in data: + intf_addr = c_data["links"][destRouterLink]["ipv6"] + interface_data.append("ipv6 address {}\n".format( + intf_addr + )) + result = create_common_configuration(tgen, c_router, + interface_data, + "interface_config", + build=build) + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + return result + + +def create_static_routes(tgen, input_dict, build=False): + """ + Create static routes for given router as defined in input_dict + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + input_dict should be in the format below: + # static_routes: list of all routes + # network: network address + # no_of_ip: number of next-hop address that will be configured + # admin_distance: admin distance for route/routes. + # next_hop: starting next-hop address + # tag: tag id for static routes + # delete: True if config to be removed. Default False. + + Example: + "routers": { + "r1": { + "static_routes": [ + { + "network": "100.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.1", + "tag": 4001 + "delete": true + } + ] + } + } + + Returns + ------- + errormsg(str) or True + """ + result = False + logger.debug("Entering lib API: create_static_routes()") + try: + for router in input_dict.keys(): + if "static_routes" not in input_dict[router]: + errormsg = "static_routes not present in input_dict" + logger.info(errormsg) + continue + + static_routes_list = [] + + static_routes = input_dict[router]["static_routes"] + for static_route in static_routes: + del_action = static_route.setdefault("delete", False) + # No of IPs + no_of_ip = static_route.setdefault("no_of_ip", 1) + admin_distance = static_route.setdefault("admin_distance", + None) + tag = static_route.setdefault("tag", None) + if "next_hop" not in static_route or \ + "network" not in static_route: + errormsg = "'next_hop' or 'network' missing in" \ + " input_dict" + return errormsg + + next_hop = static_route["next_hop"] + network = static_route["network"] + ip_list = generate_ips([network], no_of_ip) + for ip in ip_list: + addr_type = validate_ip_address(ip) + if addr_type == "ipv4": + cmd = "ip route {} {}".format(ip, next_hop) + else: + cmd = "ipv6 route {} {}".format(ip, next_hop) + + if tag: + cmd = "{} {}".format(cmd, str(tag)) + if admin_distance: + cmd = "{} {}".format(cmd, admin_distance) + + if del_action: + cmd = "no {}".format(cmd) + + static_routes_list.append(cmd) + + result = create_common_configuration(tgen, router, + static_routes_list, + "static_route", + build=build) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_static_routes()") + return result + + +def create_prefix_lists(tgen, input_dict, build=False): + """ + Create ip prefix lists as per the config provided in input + JSON or input_dict + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + # pf_lists_1: name of prefix-list, user defined + # seqid: prefix-list seqid, auto-generated if not given by user + # network: criteria for applying prefix-list + # action: permit/deny + # le: less than or equal number of bits + # ge: greater than or equal number of bits + + Example + ------- + input_dict = { + "r1": { + "prefix_lists":{ + "ipv4": { + "pf_list_1": [ + { + "seqid": 10, + "network": "any", + "action": "permit", + "le": "32", + "ge": "30", + "delete": True + } + ] + } + } + } + } + + Returns + ------- + errormsg or True + """ + + logger.debug("Entering lib API: create_prefix_lists()") + result = False + try: + for router in input_dict.keys(): + if "prefix_lists" not in input_dict[router]: + errormsg = "prefix_lists not present in input_dict" + logger.info(errormsg) + continue + + config_data = [] + prefix_lists = input_dict[router]["prefix_lists"] + for addr_type, prefix_data in prefix_lists.iteritems(): + if not check_address_types(addr_type): + continue + + for prefix_name, prefix_list in prefix_data.iteritems(): + for prefix_dict in prefix_list: + if "action" not in prefix_dict or \ + "network" not in prefix_dict: + errormsg = "'action' or network' missing in" \ + " input_dict" + return errormsg + + network_addr = prefix_dict["network"] + action = prefix_dict["action"] + le = prefix_dict.setdefault("le", None) + ge = prefix_dict.setdefault("ge", None) + seqid = prefix_dict.setdefault("seqid", None) + del_action = prefix_dict.setdefault("delete", False) + if seqid is None: + seqid = get_seq_id("prefix_lists", router, + prefix_name) + else: + set_seq_id("prefix_lists", router, seqid, + prefix_name) + + if addr_type == "ipv4": + protocol = "ip" + else: + protocol = "ipv6" + + cmd = "{} prefix-list {} seq {} {} {}".format( + protocol, prefix_name, seqid, action, network_addr + ) + if le: + cmd = "{} le {}".format(cmd, le) + if ge: + cmd = "{} ge {}".format(cmd, ge) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + result = create_common_configuration(tgen, router, + config_data, + "prefix_list", + build=build) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_prefix_lists()") + return result + + +def create_route_maps(tgen, input_dict, build=False): + """ + Create route-map on the devices as per the arguments passed + + Parameters + ---------- + * `tgen` : Topogen object + * `input_dict` : Input dict data, required when configuring from testcase + * `build` : Only for initial setup phase this is set as True. + + Usage + ----- + # route_maps: key, value pair for route-map name and its attribute + # rmap_match_prefix_list_1: user given name for route-map + # action: PERMIT/DENY + # match: key,value pair for match criteria. prefix_list, community-list, + large-community-list or tag. Only one option at a time. + # prefix_list: name of prefix list + # large-community-list: name of large community list + # community-ist: name of community list + # tag: tag id for static routes + # set: key, value pair for modifying route attributes + # localpref: preference value for the network + # med: metric value advertised for AS + # aspath: set AS path value + # weight: weight for the route + # community: standard community value to be attached + # large_community: large community value to be attached + # community_additive: if set to "additive", adds community/large-community + value to the existing values of the network prefix + + Example: + -------- + input_dict = { + "r1": { + "route_maps": { + "rmap_match_prefix_list_1": [ + { + "action": "PERMIT", + "match": { + "ipv4": { + "prefix_list": "pf_list_1" + } + "ipv6": { + "prefix_list": "pf_list_1" + } + + "large-community-list": "{ + "id": "community_1", + "exact_match": True + } + "community": { + "id": "community_2", + "exact_match": True + } + "tag": "tag_id" + }, + "set": { + "localpref": 150, + "med": 30, + "aspath": { + "num": 20000, + "action": "prepend", + }, + "weight": 500, + "community": { + "num": "1:2 2:3", + "action": additive + } + "large_community": { + "num": "1:2:3 4:5;6", + "action": additive + }, + } + } + ] + } + } + } + + Returns + ------- + errormsg(str) or True + """ + + result = False + logger.debug("Entering lib API: create_route_maps()") + + try: + for router in input_dict.keys(): + if "route_maps" not in input_dict[router]: + errormsg = "route_maps not present in input_dict" + logger.info(errormsg) + continue + rmap_data = [] + for rmap_name, rmap_value in \ + input_dict[router]["route_maps"].iteritems(): + + for rmap_dict in rmap_value: + del_action = rmap_dict.setdefault("delete", False) + + if del_action: + rmap_data.append("no route-map {}".format(rmap_name)) + continue + + if "action" not in rmap_dict: + errormsg = "action not present in input_dict" + logger.error(errormsg) + return False + + rmap_action = rmap_dict.setdefault("action", "deny") + + seq_id = rmap_dict.setdefault("seq_id", None) + if seq_id is None: + seq_id = get_seq_id("route_maps", router, rmap_name) + else: + set_seq_id("route_maps", router, seq_id, rmap_name) + + rmap_data.append("route-map {} {} {}".format( + rmap_name, rmap_action, seq_id + )) + + # Verifying if SET criteria is defined + if "set" in rmap_dict: + set_data = rmap_dict["set"] + + local_preference = set_data.setdefault("localpref", + None) + metric = set_data.setdefault("med", None) + as_path = set_data.setdefault("aspath", {}) + weight = set_data.setdefault("weight", None) + community = set_data.setdefault("community", {}) + large_community = set_data.setdefault( + "large_community", {}) + set_action = set_data.setdefault("set_action", None) + + # Local Preference + if local_preference: + rmap_data.append("set local-preference {}". + format(local_preference)) + + # Metric + if metric: + rmap_data.append("set metric {} \n".format(metric)) + + # AS Path Prepend + if as_path: + as_num = as_path.setdefault("as_num", None) + as_action = as_path.setdefault("as_action", None) + if as_action and as_num: + rmap_data.append("set as-path {} {}". + format(as_action, as_num)) + + # Community + if community: + num = community.setdefault("num", None) + comm_action = community.setdefault("action", None) + if num: + cmd = "set community {}".format(num) + if comm_action: + cmd = "{} {}".format(cmd, comm_action) + rmap_data.append(cmd) + else: + logger.error("In community, AS Num not" + " provided") + return False + + if large_community: + num = large_community.setdefault("num", None) + comm_action = large_community.setdefault("action", + None) + if num: + cmd = "set large-community {}".format(num) + if comm_action: + cmd = "{} {}".format(cmd, comm_action) + + rmap_data.append(cmd) + else: + logger.errror("In large_community, AS Num not" + " provided") + return False + + # Weight + if weight: + rmap_data.append("set weight {} \n".format( + weight)) + + # Adding MATCH and SET sequence to RMAP if defined + if "match" in rmap_dict: + match_data = rmap_dict["match"] + ipv4_data = match_data.setdefault("ipv4", {}) + ipv6_data = match_data.setdefault("ipv6", {}) + community = match_data.setdefault("community-list", + {}) + large_community = match_data.setdefault( + "large-community-list", {} + ) + tag = match_data.setdefault("tag", None) + + if ipv4_data: + prefix_name = ipv4_data.setdefault("prefix_lists", + None) + if prefix_name: + rmap_data.append("match ip address prefix-list" + " {}".format(prefix_name)) + if ipv6_data: + prefix_name = ipv6_data.setdefault("prefix_lists", + None) + if prefix_name: + rmap_data.append("match ipv6 address " + "prefix-list {}". + format(prefix_name)) + if tag: + rmap_data.append("match tag {}".format(tag)) + + if community: + if "id" not in community: + logger.error("'id' is mandatory for " + "community-list in match" + " criteria") + return False + cmd = "match community {}".format(community["id"]) + exact_match = community.setdefault("exact_match", + False) + if exact_match: + cmd = "{} exact-match".format(cmd) + + rmap_data.append(cmd) + + if large_community: + if "id" not in large_community: + logger.error("'num' is mandatory for " + "large-community-list in match " + "criteria") + return False + cmd = "match large-community {}".format( + large_community["id"]) + exact_match = large_community.setdefault( + "exact_match", False) + if exact_match: + cmd = "{} exact-match".format(cmd) + + rmap_data.append(cmd) + + result = create_common_configuration(tgen, router, + rmap_data, + "route_maps", + build=build) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: create_prefix_lists()") + return result + + +############################################# +# Verification APIs +############################################# +def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): + """ + Data will be read from input_dict or input JSON file, API will generate + same prefixes, which were redistributed by either create_static_routes() or + advertise_networks_using_network_command() and do will verify next_hop and + each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" + command o/p. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test, for which user wants to test the data + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default: static + * `protocol`[optional]: protocol, default = None + + Usage + ----- + # RIB can be verified for static routes OR network advertised using + network command. Following are input_dicts to create static routes + and advertise networks using network command. Any one of the input_dict + can be passed to verify_rib() to verify routes in DUT"s RIB. + + # Creating static routes for r1 + input_dict = { + "r1": { + "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \ + "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}] + }} + # Advertising networks using network command in router r1 + input_dict = { + "r1": { + "advertise_networks": [{"start_ip": "20.0.0.0/32", + "no_of_network": 10}, + {"start_ip": "30.0.0.0/32"}] + }} + # Verifying ipv4 routes in router r1 learned via BGP + dut = "r2" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_rib()") + + router_list = tgen.routers() + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + if addr_type == "ipv4": + if protocol: + command = "show ip route {} json".format(protocol) + else: + command = "show ip route json" + else: + if protocol: + command = "show ipv6 route {} json".format(protocol) + else: + command = "show ipv6 route json" + + sleep(2) + logger.info("Checking router %s RIB:", router) + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No {} route found in rib of router {}..". \ + format(protocol, router) + return errormsg + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + st_found = False + nh_found = False + found_routes = [] + missing_routes = [] + for static_route in static_routes: + network = static_route["network"] + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 0 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + found_hops = [rib_r["ip"] for rib_r in + rib_routes_json[st_rt][0][ + "nexthops"]] + for nh in next_hop: + nh_found = False + if nh and nh in found_hops: + nh_found = True + else: + errormsg = ("Nexthop {} is Missing for {}" + " route {} in RIB of router" + " {}\n".format(next_hop, + protocol, + st_rt, dut)) + + return errormsg + else: + missing_routes.append(st_rt) + if nh_found: + logger.info("Found next_hop %s for all routes in RIB of" + " router %s\n", next_hop, dut) + + if not st_found and len(missing_routes) > 0: + errormsg = "Missing route in RIB of router {}, routes: " \ + "{}\n".format(dut, missing_routes) + return errormsg + + logger.info("Verified routes in router %s RIB, found routes" + " are: %s\n", dut, found_routes) + + advertise_network = input_dict[routerInput].setdefault( + "advertise_networks", {}) + if advertise_network: + found_routes = [] + missing_routes = [] + found = False + for advertise_network_dict in advertise_network: + start_ip = advertise_network_dict["network"] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 0 + + # Generating IPs for verification + ip_list = generate_ips(start_ip, no_of_network) + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + if st_rt in rib_routes_json: + found = True + found_routes.append(st_rt) + else: + missing_routes.append(st_rt) + + if not found and len(missing_routes) > 0: + errormsg = "Missing route in RIB of router {}, are: {}" \ + " \n".format(dut, missing_routes) + return errormsg + + logger.info("Verified routes in router %s RIB, found routes" + " are: %s", dut, found_routes) + + logger.info("Exiting lib API: verify_rib()") + return True + + +def verify_admin_distance_for_static_routes(tgen, input_dict): + """ + API to verify admin distance for static routes as defined in input_dict/ + input JSON by running show ip/ipv6 route json command. + + Parameter + --------- + * `tgen` : topogen object + * `input_dict`: having details like - for which router and static routes + admin dsitance needs to be verified + Usage + ----- + # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop + 10.0.0.2 in router r1 + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2" + }] + } + } + result = verify_admin_distance_for_static_routes(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_admin_distance_for_static_routes()") + + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + + for static_route in input_dict[router]["static_routes"]: + addr_type = validate_ip_address(static_route["network"]) + # Command to execute + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + show_ip_route_json = rnode.vtysh_cmd(command, isjson=True) + + logger.info("Verifying admin distance for static route %s" + " under dut %s:", static_route, router) + network = static_route["network"] + next_hop = static_route["next_hop"] + admin_distance = static_route["admin_distance"] + route_data = show_ip_route_json[network][0] + if network in show_ip_route_json: + if route_data["nexthops"][0]["ip"] == next_hop: + if route_data["distance"] != admin_distance: + errormsg = ("Verification failed: admin distance" + " for static route {} under dut {}," + " found:{} but expected:{}". + format(static_route, router, + route_data["distance"], + admin_distance)) + return errormsg + else: + logger.info("Verification successful: admin" + " distance for static route %s under" + " dut %s, found:%s", static_route, + router, route_data["distance"]) + + else: + errormsg = ("Static route {} not found in " + "show_ip_route_json for dut {}". + format(network, router)) + return errormsg + + logger.info("Exiting lib API: verify_admin_distance_for_static_routes()") + return True + + +def verify_prefix_lists(tgen, input_dict): + """ + Running "show ip prefix-list" command and verifying given prefix-list + is present in router. + + Parameters + ---------- + * `tgen` : topogen object + * `input_dict`: data to verify prefix lists + + Usage + ----- + # To verify pf_list_1 is present in router r1 + input_dict = { + "r1": { + "prefix_lists": ["pf_list_1"] + }} + result = verify_prefix_lists("ipv4", input_dict, tgen) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_prefix_lists()") + + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + + # Show ip prefix list + show_prefix_list = rnode.vtysh_cmd("show ip prefix-list") + + # Verify Prefix list is deleted + prefix_lists_addr = input_dict[router]["prefix_lists"] + for addr_type in prefix_lists_addr: + if not check_address_types(addr_type): + continue + + for prefix_list in prefix_lists_addr[addr_type].keys(): + if prefix_list in show_prefix_list: + errormsg = ("Prefix list {} is not deleted from router" + " {}".format(prefix_list, router)) + return errormsg + + logger.info("Prefix list %s is/are deleted successfully" + " from router %s", prefix_list, router) + + logger.info("Exiting lib API: verify_prefix_lissts()") + return True diff --git a/tests/topotests/lib/topojson.py b/tests/topotests/lib/topojson.py new file mode 100644 index 0000000000..4130451d2e --- /dev/null +++ b/tests/topotests/lib/topojson.py @@ -0,0 +1,193 @@ +# +# Modified work Copyright (c) 2019 by VMware, Inc. ("VMware") +# Original work Copyright (c) 2018 by Network Device Education +# Foundation, Inc. ("NetDEF") +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +from collections import OrderedDict +from json import dumps as json_dumps +import ipaddr +import pytest + +# Import topogen and topotest helpers +from lib.topolog import logger + +# Required to instantiate the topology builder class. +from lib.common_config import ( + number_to_row, number_to_column, + load_config_to_router, + create_interfaces_cfg, + create_static_routes, + create_prefix_lists, + create_route_maps, +) + +from lib.bgp import create_router_bgp + +def build_topo_from_json(tgen, topo): + """ + Reads configuration from JSON file. Adds routers, creates interface + names dynamically and link routers as defined in JSON to create + topology. Assigns IPs dynamically to all interfaces of each router. + + * `tgen`: Topogen object + * `topo`: json file data + """ + + listRouters = [] + for routerN in sorted(topo['routers'].iteritems()): + logger.info('Topo: Add router {}'.format(routerN[0])) + tgen.add_router(routerN[0]) + listRouters.append(routerN[0]) + + listRouters.sort() + if 'ipv4base' in topo: + ipv4Next = ipaddr.IPv4Address(topo['link_ip_start']['ipv4']) + ipv4Step = 2 ** (32 - topo['link_ip_start']['v4mask']) + if topo['link_ip_start']['v4mask'] < 32: + ipv4Next += 1 + if 'ipv6base' in topo: + ipv6Next = ipaddr.IPv6Address(topo['link_ip_start']['ipv6']) + ipv6Step = 2 ** (128 - topo['link_ip_start']['v6mask']) + if topo['link_ip_start']['v6mask'] < 127: + ipv6Next += 1 + for router in listRouters: + topo['routers'][router]['nextIfname'] = 0 + + while listRouters != []: + curRouter = listRouters.pop(0) + # Physical Interfaces + if 'links' in topo['routers'][curRouter]: + def link_sort(x): + if x == 'lo': + return 0 + elif 'link' in x: + return int(x.split('-link')[1]) + else: + return int(x.split('r')[1]) + for destRouterLink, data in sorted(topo['routers'][curRouter]['links']. \ + iteritems(), + key=lambda x: link_sort(x[0])): + currRouter_lo_json = \ + topo['routers'][curRouter]['links'][destRouterLink] + # Loopback interfaces + if 'type' in data and data['type'] == 'loopback': + if 'ipv4' in currRouter_lo_json and \ + currRouter_lo_json['ipv4'] == 'auto': + currRouter_lo_json['ipv4'] = '{}{}.{}/{}'. \ + format(topo['lo_prefix']['ipv4'], number_to_row(curRouter), \ + number_to_column(curRouter), topo['lo_prefix']['v4mask']) + if 'ipv6' in currRouter_lo_json and \ + currRouter_lo_json['ipv6'] == 'auto': + currRouter_lo_json['ipv6'] = '{}{}:{}/{}'. \ + format(topo['lo_prefix']['ipv6'], number_to_row(curRouter), \ + number_to_column(curRouter), topo['lo_prefix']['v6mask']) + + if "-" in destRouterLink: + # Spliting and storing destRouterLink data in tempList + tempList = destRouterLink.split("-") + + # destRouter + destRouter = tempList.pop(0) + + # Current Router Link + tempList.insert(0, curRouter) + curRouterLink = "-".join(tempList) + else: + destRouter = destRouterLink + curRouterLink = curRouter + + if destRouter in listRouters: + currRouter_link_json = \ + topo['routers'][curRouter]['links'][destRouterLink] + destRouter_link_json = \ + topo['routers'][destRouter]['links'][curRouterLink] + + # Assigning name to interfaces + currRouter_link_json['interface'] = \ + '{}-{}-eth{}'.format(curRouter, destRouter, topo['routers'] \ + [curRouter]['nextIfname']) + destRouter_link_json['interface'] = \ + '{}-{}-eth{}'.format(destRouter, curRouter, topo['routers'] \ + [destRouter]['nextIfname']) + + topo['routers'][curRouter]['nextIfname'] += 1 + topo['routers'][destRouter]['nextIfname'] += 1 + + # Linking routers to each other as defined in JSON file + tgen.gears[curRouter].add_link(tgen.gears[destRouter], + topo['routers'][curRouter]['links'][destRouterLink] \ + ['interface'], topo['routers'][destRouter]['links'] \ + [curRouterLink]['interface']) + + # IPv4 + if 'ipv4' in currRouter_link_json: + if currRouter_link_json['ipv4'] == 'auto': + currRouter_link_json['ipv4'] = \ + '{}/{}'.format(ipv4Next, topo['link_ip_start'][ \ + 'v4mask']) + destRouter_link_json['ipv4'] = \ + '{}/{}'.format(ipv4Next + 1, topo['link_ip_start'][ \ + 'v4mask']) + ipv4Next += ipv4Step + # IPv6 + if 'ipv6' in currRouter_link_json: + if currRouter_link_json['ipv6'] == 'auto': + currRouter_link_json['ipv6'] = \ + '{}/{}'.format(ipv6Next, topo['link_ip_start'][ \ + 'v6mask']) + destRouter_link_json['ipv6'] = \ + '{}/{}'.format(ipv6Next + 1, topo['link_ip_start'][ \ + 'v6mask']) + ipv6Next = ipaddr.IPv6Address(int(ipv6Next) + ipv6Step) + + logger.debug("Generated link data for router: %s\n%s", curRouter, + json_dumps(topo["routers"][curRouter]["links"], + indent=4, sort_keys=True)) + + +def build_config_from_json(tgen, topo, save_bkup=True): + """ + Reads initial configuraiton from JSON for each router, builds + configuration and loads its to router. + + * `tgen`: Topogen object + * `topo`: json file data + """ + + func_dict = OrderedDict([ + ("links", create_interfaces_cfg), + ("static_routes", create_static_routes), + ("prefix_lists", create_prefix_lists), + ("route_maps", create_route_maps), + ("bgp", create_router_bgp) + ]) + + data = topo["routers"] + for func_type in func_dict.keys(): + logger.info('Building configuration for {}'.format(func_type)) + + func_dict.get(func_type)(tgen, data, build=True) + + for router in sorted(topo['routers'].keys()): + logger.info('Configuring router {}...'.format(router)) + + result = load_config_to_router(tgen, router, save_bkup) + if not result: + logger.info("Failed while configuring {}".format(router)) + pytest.exit(1) + diff --git a/tests/topotests/pytest.ini b/tests/topotests/pytest.ini index 119ab93857..7ea38491d8 100644 --- a/tests/topotests/pytest.ini +++ b/tests/topotests/pytest.ini @@ -10,6 +10,13 @@ norecursedirs = .git example-test lib docker # value is 'info', but can be changed to 'debug' to provide more details. #verbosity = info +# Save logs to log file, by default logs will be displayed to console +#frrtest_log_dir = /tmp/topotests/ + +# Display router current configuration during test execution, +# by default configuration will not be shown +show_router_config = True + # Default daemons binaries path. #frrdir = /usr/lib/frr #quaggadir = /usr/lib/quagga |
