summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/topotests/bgp-basic-functionality-topo1/__init__.py0
-rw-r--r--tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json172
-rwxr-xr-xtests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py595
-rw-r--r--tests/topotests/bgp-path-attributes-topo1/__init__.py0
-rw-r--r--tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json220
-rwxr-xr-xtests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py1078
-rw-r--r--tests/topotests/bgp-prefix-list-topo1/__init__.py0
-rw-r--r--tests/topotests/bgp-prefix-list-topo1/prefix_lists.json123
-rwxr-xr-xtests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py1450
-rw-r--r--tests/topotests/bgp_multiview_topo1/README.md2
-rwxr-xr-xtests/topotests/example-topojson-test/__init__.py0
-rwxr-xr-xtests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py0
-rw-r--r--tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json152
-rwxr-xr-xtests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py194
-rwxr-xr-xtests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py0
-rw-r--r--tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json153
-rwxr-xr-xtests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py190
-rwxr-xr-xtests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py0
-rw-r--r--tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json161
-rwxr-xr-xtests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py205
-rw-r--r--tests/topotests/lib/bgp.py1521
-rw-r--r--tests/topotests/lib/common_config.py1391
-rw-r--r--tests/topotests/lib/topojson.py193
-rw-r--r--tests/topotests/lib/topotest.py2
-rw-r--r--tests/topotests/ospf6-topo1/README.md2
-rw-r--r--tests/topotests/pytest.ini7
26 files changed, 7808 insertions, 3 deletions
diff --git a/tests/topotests/bgp-basic-functionality-topo1/__init__.py b/tests/topotests/bgp-basic-functionality-topo1/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/bgp-basic-functionality-topo1/__init__.py
diff --git a/tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json b/tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json
new file mode 100644
index 0000000000..c778ae4bed
--- /dev/null
+++ b/tests/topotests/bgp-basic-functionality-topo1/bgp_basic_functionality.json
@@ -0,0 +1,172 @@
+{
+ "ipv4base": "10.0.0.0",
+ "ipv4mask": 30,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "10.0.0.0",
+ "v4mask": 30,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:DB8:F::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r4": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r4": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "200",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
new file mode 100755
index 0000000000..095ebe3344
--- /dev/null
+++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
@@ -0,0 +1,595 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test BGP basic functionality:
+
+Test steps
+- Create topology (setup module)
+ Creating 4 routers topology, r1, r2, r3 are in IBGP and
+ r3, r4 are in EBGP
+- Bring up topology
+- Verify for bgp to converge
+- Modify/Delete and verify router-id
+- Modify and verify bgp timers
+- Create and verify static routes
+- Modify and verify admin distance for existing static routes
+- Test advertise network using network command
+- Verify clear bgp
+- Test bgp convergence with loopback interface
+- Test advertise network using network command
+"""
+
+import os
+import sys
+import json
+import time
+import pytest
+from copy import deepcopy
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, '../lib/'))
+
+# Required to instantiate the topology builder class.
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+from mininet.topo import Topo
+
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, reset_config_on_routers, create_static_routes,
+ verify_rib, verify_admin_distance_for_static_routes
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence, create_router_bgp, verify_router_id,
+ modify_as_number, verify_as_numbers, clear_bgp_and_verify,
+ verify_bgp_timers_and_functionality
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_basic_functionality.json".format(CWD)
+try:
+ with open(jsonFile, 'r') as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+
+class CreateTopo(Topo):
+ """
+ Test BasicTopo - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ """Build function"""
+ tgen = get_topogen(self)
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(CreateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ global BGP_CONVERGENCE
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}". \
+ format(BGP_CONVERGENCE)
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """Teardown the pytest environment"""
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info("Testsuite end time: {}".
+ format(time.asctime(time.localtime(time.time()))))
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Testcases
+#
+#####################################################
+
+
+def test_modify_and_delete_router_id(request):
+ """ Test to modify, delete and verify router-id. """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Modify router id
+ input_dict = {
+ 'r1': {
+ "bgp": {
+ 'router_id': '12.12.12.12'
+ }
+ },
+ 'r2': {
+ "bgp": {
+ 'router_id': '22.22.22.22'
+ }
+ },
+ 'r3': {
+ "bgp": {
+ 'router_id': '33.33.33.33'
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".\
+ format(tc_name, result)
+
+ # Verifying router id once modified
+ result = verify_router_id(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".\
+ format(tc_name, result)
+
+ # Delete router id
+ input_dict = {
+ 'r1': {
+ "bgp": {
+ 'del_router_id': True
+ }
+ },
+ 'r2': {
+ "bgp": {
+ 'del_router_id': True
+ }
+ },
+ 'r3': {
+ "bgp": {
+ 'del_router_id': True
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying router id once deleted
+ # Once router-id is deleted, highest interface ip should become
+ # router-id
+ result = verify_router_id(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_config_with_4byte_as_number(request):
+ """
+ Configure BGP with 4 byte ASN and verify it works fine
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 131079
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 131079
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 131080
+ }
+ }
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ result = verify_as_numbers(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_timers_functionality(request):
+ """
+ Test to modify bgp timers and verify timers functionality.
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to modfiy BGP timerse
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link":{
+ "r1": {
+ "keepalivetimer": 60,
+ "holddowntimer": 180,
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, deepcopy(input_dict))
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Api call to clear bgp, so timer modification would take place
+ clear_bgp_and_verify(tgen, topo, 'r1')
+
+ # Verifying bgp timers functionality
+ result = verify_bgp_timers_and_functionality(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+
+
+def test_static_routes(request):
+ """ Test to create and verify static routes. """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to create static routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Api call to redistribute static routes
+ input_dict_1 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying RIB routes
+ dut = 'r3'
+ protocol = 'bgp'
+ next_hop = '10.0.0.2'
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop,
+ protocol=protocol)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_admin_distance_for_existing_static_routes(request):
+ """ Test to modify and verify admin distance for existing static routes."""
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying admin distance once modified
+ result = verify_admin_distance_for_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_advertise_network_using_network_command(request):
+ """ Test advertise networks using network command."""
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "20.0.0.0/32",
+ "no_of_network": 10
+ },
+ {
+ "network": "30.0.0.0/32",
+ "no_of_network": 10
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying RIB routes
+ dut = 'r2'
+ protocol = "bgp"
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_clear_bgp_and_verify(request):
+ """
+ Created few static routes and verified all routes are learned via BGP
+ cleared BGP and verified all routes are intact
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # clear ip bgp
+ result = clear_bgp_and_verify(tgen, topo, 'r1')
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_with_loopback_interface(request):
+ """
+ Test BGP with loopback interface
+
+ Adding keys:value pair "dest_link": "lo" and "source_link": "lo"
+ peer dict of input json file for all router's creating config using
+ loopback interface. Once BGP neighboship is up then verifying BGP
+ convergence
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ for routerN in sorted(topo['routers'].keys()):
+ for bgp_neighbor in \
+ topo['routers'][routerN]['bgp']['address_family']['ipv4'][
+ 'unicast']['neighbor'].keys():
+
+ # Adding ['source_link'] = 'lo' key:value pair
+ topo['routers'][routerN]['bgp']['address_family']['ipv4'][
+ 'unicast']['neighbor'][bgp_neighbor]["dest_link"] = {
+ 'lo': {
+ "source_link": "lo",
+ }
+ }
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "1.0.2.17/32",
+ "next_hop": "10.0.0.2"
+ },
+ {
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.6"
+ }
+ ]
+ },
+ "r2": {
+ "static_routes": [{
+ "network": "1.0.1.17/32",
+ "next_hop": "10.0.0.1"
+ },
+ {
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.10"
+ }
+ ]
+ },
+ "r3": {
+ "static_routes": [{
+ "network": "1.0.1.17/32",
+ "next_hop": "10.0.0.5"
+ },
+ {
+ "network": "1.0.2.17/32",
+ "next_hop": "10.0.0.9"
+ },
+ {
+ "network": "1.0.4.17/32",
+ "next_hop": "10.0.0.14"
+ }
+ ]
+ },
+ "r4": {
+ "static_routes": [{
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.13"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Api call verify whether BGP is converged
+ result = verify_bgp_convergence(tgen, topo)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == '__main__':
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/bgp-path-attributes-topo1/__init__.py b/tests/topotests/bgp-path-attributes-topo1/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/bgp-path-attributes-topo1/__init__.py
diff --git a/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json
new file mode 100644
index 0000000000..15b7ec13be
--- /dev/null
+++ b/tests/topotests/bgp-path-attributes-topo1/bgp_path_attributes.json
@@ -0,0 +1,220 @@
+{
+ "ipv4base":"10.0.0.0",
+ "ipv4mask":30,
+ "ipv6base":"fd00::",
+ "ipv6mask":64,
+ "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64},
+ "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128},
+ "routers":{
+ "r1":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r2":{"ipv4":"auto", "ipv6":"auto"},
+ "r3":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"555",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r1": {"ipv4": "auto", "ipv6": "auto"},
+ "r3": {"ipv4": "auto", "ipv6": "auto"},
+ "r4-link1": {"ipv4": "auto", "ipv6": "auto"},
+ "r4-link2": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"555",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r2-link1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r1":{"ipv4":"auto", "ipv6":"auto"},
+ "r2":{"ipv4":"auto", "ipv6":"auto"},
+ "r5":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"555",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r5": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r4":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r2-link1": {"ipv4": "auto", "ipv6": "auto"},
+ "r2-link2": {"ipv4": "auto", "ipv6": "auto"},
+ "r6": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp": {
+ "local_as": "666",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r4-link1": {}
+ }
+ },
+ "r6": {
+ "dest_link": {
+ "r4": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r5":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r3": {"ipv4": "auto", "ipv6": "auto"},
+ "r7": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"666",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r5": {}
+ }
+ },
+ "r7": {
+ "dest_link": {
+ "r5": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r6":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r4": {"ipv4": "auto", "ipv6": "auto"},
+ "r7": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"777",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r6": {}
+ }
+ },
+ "r7": {
+ "dest_link": {
+ "r6": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r7":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback"},
+ "r5": {"ipv4": "auto", "ipv6": "auto"},
+ "r6": {"ipv4": "auto", "ipv6": "auto"}
+ },
+ "bgp":{
+ "local_as":"888",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r5": {
+ "dest_link": {
+ "r7": {}
+ }
+ },
+ "r6": {
+ "dest_link": {
+ "r7": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
new file mode 100755
index 0000000000..abd6b396d1
--- /dev/null
+++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py
@@ -0,0 +1,1078 @@
+#!/usr/bin/env python
+
+#
+# Modified work Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Original work Copyright (c) 2018 by Network Device Education
+# Foundation, Inc. ("NetDEF")
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test AS-Path functionality:
+
+Setup module:
+- Create topology (setup module)
+- Bring up topology
+- Verify BGP convergence
+
+Test cases:
+1. Test next_hop attribute and verify best path is installed as per
+ reachable next_hop
+2. Test aspath attribute and verify best path is installed as per
+ shortest AS-Path
+3. Test localpref attribute and verify best path is installed as per
+ shortest local-preference
+4. Test weight attribute and and verify best path is installed as per
+ highest weight
+5. Test origin attribute and verify best path is installed as per
+ IGP>EGP>INCOMPLETE rule
+6. Test med attribute and verify best path is installed as per lowest
+ med value
+7. Test admin distance and verify best path is installed as per lowest
+ admin distance
+
+Teardown module:
+- Bring down the topology
+- stop routers
+
+"""
+
+import os
+import sys
+import pdb
+import json
+import time
+import inspect
+import ipaddress
+from time import sleep
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from mininet.topo import Topo
+from lib import topotest
+from lib.topogen import Topogen, TopoRouter, get_topogen
+
+# Required to instantiate the topology builder class.
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, reset_config_on_routers,
+ verify_rib, create_static_routes,
+ create_prefix_lists, verify_prefix_lists,
+ create_route_maps
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence, create_router_bgp,
+ clear_bgp_and_verify, verify_best_path_as_per_bgp_attribute,
+ verify_best_path_as_per_admin_distance
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_path_attributes.json".format(CWD)
+
+try:
+ with open(jsonFile, "r") as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+
+####
+class CreateTopo(Topo):
+ """
+ Test CreateTopo - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # Building topology and configuration from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: %s", testsuite_run_time)
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(CreateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Checking BGP convergence
+ result = verify_bgp_convergence(tgen, topo)
+ assert result is True, ("setup_module :Failed \n Error:"
+ " {}".format(result))
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """
+ Teardown the pytest environment
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info("Testsuite end time: %s",
+ time.asctime(time.localtime(time.time())))
+ logger.info("=" * 40)
+
+
+#####################################################
+##
+## Testcases
+##
+#####################################################
+def test_next_hop_attribute(request):
+ """
+ Verifying route are not getting installed in, as next_hop is
+ unreachable, Making next hop reachable using next_hop_self
+ command and verifying routes are installed.
+ """
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp":{
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r1"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: %s", result)
+
+ # Configure next-hop-self to bgp neighbor
+ input_dict_1 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r1"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_aspath_attribute(request):
+ " Verifying AS_PATH attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "aspath"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_localpref_attribute(request):
+ " Verifying LOCAL PREFERENCE attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Prefix list
+ input_dict_2 = {
+ "r2": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_1": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create route map
+ input_dict_3 = {
+ "r2": {
+ "route_maps": {
+ "RMAP_LOCAL_PREF": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_1"
+ }
+ },
+ "set": {
+ "localpref": 1000
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {"name": "RMAP_LOCAL_PREF",
+ "direction": "in"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "localpref"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_weight_attribute(request):
+ " Verifying WEIGHT attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Prefix list
+ input_dict_2 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_1": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create route map
+ input_dict_3 = {
+ "r1": {
+ "route_maps": {
+ "RMAP_WEIGHT": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_1"
+ }
+ },
+ "set": {
+ "weight": 500
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {
+ "route_maps": [
+ {"name": "RMAP_WEIGHT",
+ "direction": "in"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "weight"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_origin_attribute(request):
+ " Verifying ORIGIN attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r5": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to create static routes
+ input_dict_3 = {
+ "r5": {
+ "static_routes": [
+ {
+ "network": "200.50.2.0/32",
+ "next_hop": "10.0.0.26"
+ },
+ {
+ "network": "200.60.2.0/32",
+ "next_hop": "10.0.0.26"
+ }
+ ]
+ }
+ }
+ result = create_static_routes(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Configure next-hop-self to bgp neighbor
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "origin"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_med_attribute(request):
+ " Verifying MED attribute functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to advertise networks
+ input_dict = {
+ "r4": {
+ "bgp":{
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to advertise networks
+
+
+ # Configure next-hop-self to bgp neighbor
+
+
+ # Create Prefix list
+ input_dict_3 = {
+ "r2": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_r2": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ },
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_ls_r3": [{
+ "seqid": 10,
+ "network": "200.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create route map
+ input_dict_3 = {
+ "r2": {
+ "route_maps": {
+ "RMAP_MED_R2": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_r2"
+ }
+ },
+ "set": {
+ "med": 100
+ }
+ }]
+ }
+ },
+ "r3": {
+ "route_maps": {
+ "RMAP_MED_R3": [{
+ "action": "permit",
+ "match": {
+ "ipv4": {
+ "prefix_lists": "pf_ls_r3"
+ }
+ },
+ "set": {
+ "med": 10
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r5": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r2-link1": {
+ "route_maps": [
+ {"name": "RMAP_MED_R2",
+ "direction": "in"}
+ ]
+ }
+ }
+ },
+ "r1": {
+ "dest_link": {
+ "r2": {"next_hop_self": True}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {"next_hop_self": True}
+ }
+ },
+ "r5": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {"name": "RMAP_MED_R3",
+ "direction": "in"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "med"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ logger.info("Testcase %s :Passed \n", tc_name)
+
+ # Uncomment next line for debugging
+ # tgen.mininet_cli()
+
+
+def test_admin_distance(request):
+ " Verifying admin distance functionality"
+
+ tgen = get_topogen()
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to create static routes
+ input_dict = {
+ "r2": {
+ "static_routes": [
+ {
+ "network": "200.50.2.0/32",
+ "admin_distance": 80,
+ "next_hop": "10.0.0.14"
+ },
+ {
+ "network": "200.50.2.0/32",
+ "admin_distance": 60,
+ "next_hop": "10.0.0.18"
+ }
+ ]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+ input_dict_2 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying best path
+ dut = "r1"
+ attribute = "admin_distance"
+ result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut,
+ input_dict, attribute)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/bgp-prefix-list-topo1/__init__.py b/tests/topotests/bgp-prefix-list-topo1/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/bgp-prefix-list-topo1/__init__.py
diff --git a/tests/topotests/bgp-prefix-list-topo1/prefix_lists.json b/tests/topotests/bgp-prefix-list-topo1/prefix_lists.json
new file mode 100644
index 0000000000..3bb07ad994
--- /dev/null
+++ b/tests/topotests/bgp-prefix-list-topo1/prefix_lists.json
@@ -0,0 +1,123 @@
+{
+ "address_types": ["ipv4"],
+ "ipv4base":"10.0.0.0",
+ "ipv4mask":30,
+ "ipv6base":"fd00::",
+ "ipv6mask":64,
+ "link_ip_start":{"ipv4":"10.0.0.0", "v4mask":30, "ipv6":"fd00::", "v6mask":64},
+ "lo_prefix":{"ipv4":"1.0.", "v4mask":32, "ipv6":"2001:DB8:F::", "v6mask":128},
+ "routers":{
+ "r1":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"},
+ "r2":{"ipv4":"auto", "ipv6":"auto"},
+ "r3":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"},
+ "r1":{"ipv4":"auto", "ipv6":"auto"},
+ "r3":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"},
+ "r1":{"ipv4":"auto", "ipv6":"auto"},
+ "r2":{"ipv4":"auto", "ipv6":"auto"},
+ "r4":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r4":{
+ "links":{
+ "lo": {"ipv4": "auto", "ipv6": "auto", "type": "loopback", "add_static_route":"yes"},
+ "r3":{"ipv4":"auto", "ipv6":"auto"}
+ },
+ "bgp":{
+ "local_as":"200",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/tests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py b/tests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py
new file mode 100755
index 0000000000..25a346f20d
--- /dev/null
+++ b/tests/topotests/bgp-prefix-list-topo1/test_prefix_lists.py
@@ -0,0 +1,1450 @@
+#!/usr/bin/python
+
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test prefix-list functionality:
+
+Test steps
+- Create topology (setup module)
+ Creating 4 routers topology, r1, r2, r3 are in IBGP and
+ r3, r4 are in EBGP
+- Bring up topology
+- Verify for bgp to converge
+
+IP prefix-list tests
+- Test ip prefix-lists IN permit
+- Test ip prefix-lists OUT permit
+- Test ip prefix-lists IN deny and permit any
+- Test delete ip prefix-lists
+- Test ip prefix-lists OUT deny and permit any
+- Test modify ip prefix-lists IN permit to deny
+- Test modify ip prefix-lists IN deny to permit
+- Test modify ip prefix-lists OUT permit to deny
+- Test modify prefix-lists OUT deny to permit
+- Test ip prefix-lists implicit deny
+"""
+
+import sys
+import json
+import time
+import os
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from mininet.topo import Topo
+from lib.topogen import Topogen, get_topogen
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, reset_config_on_routers,
+ verify_rib, create_static_routes,
+ create_prefix_lists, verify_prefix_lists
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence, create_router_bgp,
+ clear_bgp_and_verify
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/prefix_lists.json".format(CWD)
+
+try:
+ with open(jsonFile, "r") as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+bgp_convergence = False
+
+
+class BGPPrefixListTopo(Topo):
+ """
+ Test BGPPrefixListTopo - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("="*40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(BGPPrefixListTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Checking BGP convergence
+ global BGP_CONVERGENCE
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, ("setup_module :Failed \n Error:"
+ " {}".format(BGP_CONVERGENCE))
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info("Testsuite end time: {}".
+ format(time.asctime(time.localtime(time.time()))))
+ logger.info("="*40)
+
+#####################################################
+#
+# Tests starting
+#
+#####################################################
+
+
+def test_ip_prefix_lists_in_permit(request):
+ """
+ Create ip prefix list and test permit prefixes IN direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Create Static routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "20.0.20.1/32",
+ "no_of_ip": 1,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create ip prefix list
+ input_dict_2 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": 10,
+ "network": "any",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure bgp neighbor with prefix list
+ input_dict_3 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "in"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_ip_prefix_lists_out_permit(request):
+ """
+ Create ip prefix list and test permit prefixes out direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 1,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Static routes
+ input_dict_1 = {
+ "r1": {
+ "static_routes": [{
+ "network": "20.0.20.1/32",
+ "no_of_ip": 1,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ input_dict_5 = {
+ "r3": {
+ "static_routes": [{
+ "network": "10.0.0.2/30",
+ "no_of_ip": 1,
+ "next_hop": "10.0.0.9"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict_5)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Create ip prefix list
+ input_dict_2 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": 10,
+ "network": "20.0.20.1/32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ # Configure bgp neighbor with prefix list
+ input_dict_3 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r1": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "out"
+ }
+ ]
+ }
+ }
+ }
+ },
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ write_test_footer(tc_name)
+
+
+def test_ip_prefix_lists_in_deny_and_permit_any(request):
+ """
+ Create ip prefix list and test permit/deny prefixes IN direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 1,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+ # Create ip prefix list
+ input_dict_2 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.20.1/32",
+ "action": "deny"
+ },
+ {
+ "seqid": "11",
+ "network": "any",
+ "action": "permit"
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure bgp neighbor with prefix list
+ input_dict_3 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "in"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ # Configure prefix list to bgp neighbor
+ result = create_router_bgp(tgen, topo, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_delete_prefix_lists(request):
+ """
+ Delete ip prefix list
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create ip prefix list
+ input_dict_2 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.20.1/32",
+ "action": "deny"
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ result = verify_prefix_lists(tgen, input_dict_2)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ logger.info(result)
+
+ # Delete prefix list
+ input_dict_2 = {
+ "r1": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.20.1/32",
+ "action": "deny",
+ "delete": True
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ result = verify_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_ip_prefix_lists_out_deny_and_permit_any(request):
+ """
+ Create ip prefix list and test deny/permit any prefixes OUT direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Static Routes
+ input_dict_1 = {
+ "r2": {
+ "static_routes": [{
+ "network": "20.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.1"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Create ip prefix list
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "deny"
+ },
+ {
+ "seqid": "11",
+ "network": "any",
+ "action": "permit"
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ input_dict_4 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "out"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_modify_prefix_lists_in_permit_to_deny(request):
+ """
+ Modify ip prefix list and test permit to deny prefixes IN direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Create ip prefix list
+ input_dict_2 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ input_dict_3 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link":{
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "in"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Modify prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "deny"
+ },
+ {
+ "seqid": "11",
+ "network": "any",
+ "action": "permit"
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to clear bgp, so config changes would be reflected
+ dut = "r3"
+ result = clear_bgp_and_verify(tgen, topo, dut)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_modify_prefix_lists_in_deny_to_permit(request):
+ """
+ Modify ip prefix list and test deny to permit prefixes IN direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Create ip prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "deny"
+ },
+ {
+ "seqid": "11",
+ "network": "any",
+ "action": "permit"
+ }
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "in"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Modify ip prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to clear bgp, so config changes would be reflected
+ dut = "r3"
+ result = clear_bgp_and_verify(tgen, topo, dut)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_modify_prefix_lists_out_permit_to_deny(request):
+ """
+ Modify ip prefix list and test permit to deny prefixes OUT direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+
+ # Create ip prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "out"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Modify ip prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "deny"
+ },
+ {
+ "seqid": "11",
+ "network": "any",
+ "action": "permit"
+ }
+ ]
+ }
+ }
+ }
+
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to clear bgp, so config changes would be reflected
+ dut = "r3"
+ result = clear_bgp_and_verify(tgen, topo, dut)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_modify_prefix_lists_out_deny_to_permit(request):
+ """
+ Modify ip prefix list and test deny to permit prefixes OUT direction
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+ # Create ip prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "deny"
+ },
+ {
+ "seqid": "11",
+ "network": "any",
+ "action": "permit"
+ }
+ ]
+ }
+ }
+ }
+
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link":{
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "out"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Modify ip prefix list
+ input_dict_1 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+
+ }
+ result = create_prefix_lists(tgen, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to clear bgp, so config changes would be reflected
+ dut = "r3"
+ result = clear_bgp_and_verify(tgen, topo, dut)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_ip_prefix_lists_implicit_deny(request):
+ """
+ Create ip prefix list and test implicit deny
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Create Static Routes
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Create Static Routes
+ input_dict_1 = {
+ "r2": {
+ "static_routes": [{
+ "network": "20.0.20.1/32",
+ "no_of_ip": 9,
+ "next_hop": "10.0.0.1"
+ }]
+ }
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Api call to redistribute static routes
+ # Create ip prefix list
+ input_dict_3 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "pf_list_1": [{
+ "seqid": "10",
+ "network": "10.0.0.0/8",
+ "le": "32",
+ "action": "permit"
+ }]
+ }
+ }
+ }
+
+ }
+ result = create_prefix_lists(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Configure prefix list to bgp neighbor
+ input_dict_4 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ]
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r3": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "out"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ # Verifying RIB routes
+ dut = "r4"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol)
+ assert result is not True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/bgp_multiview_topo1/README.md b/tests/topotests/bgp_multiview_topo1/README.md
index c5e615d252..2a2747344a 100644
--- a/tests/topotests/bgp_multiview_topo1/README.md
+++ b/tests/topotests/bgp_multiview_topo1/README.md
@@ -94,7 +94,7 @@ Simplified `R1` config:
Test is executed by running
- vtysh -c "show log" | grep "Logging configuration for"
+ vtysh -c "show logging" | grep "Logging configuration for"
on router `R1`. This should return the logging information for all daemons registered
to Zebra and the list of running daemons is compared to the daemons started for this
diff --git a/tests/topotests/example-topojson-test/__init__.py b/tests/topotests/example-topojson-test/__init__.py
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/example-topojson-test/__init__.py
diff --git a/tests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/__init__.py
diff --git a/tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json
new file mode 100644
index 0000000000..3968348b1f
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/example_topojson_multiple_links.json
@@ -0,0 +1,152 @@
+{
+ "ipv4base": "10.0.0.0",
+ "ipv4mask": 30,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "10.0.0.0",
+ "v4mask": 30,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:DB8:F::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2-link1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r2-link2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1-link1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1-link1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r1-link2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3-link1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3-link2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {
+ "redist_type": "static"
+ }
+ ],
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2-link1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2-link1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "100.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.1"
+ }
+ ]
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2-link1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r2-link2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3-link1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "10.0.0.1/30",
+ "next_hop": "10.0.0.5"
+ }
+ ]
+ }
+ }
+}
+
diff --git a/tests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py
new file mode 100755
index 0000000000..8e794b9946
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_multiple_links/test_example_topojson_multiple_links.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
+# in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+<example>.py: Test <example tests>.
+"""
+
+import os
+import sys
+import json
+import time
+import inspect
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, '../../'))
+
+# pylint: disable=C0413
+from lib.topogen import Topogen, get_topogen
+
+# Required to instantiate the topology builder class.
+from mininet.topo import Topo
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, verify_rib
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology and configuration creation
+jsonFile = "{}/example_topojson_multiple_links.json".format(CWD)
+try:
+ with open(jsonFile, 'r') as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+bgp_convergence = False
+input_dict = {}
+
+
+class TemplateTopo(Topo):
+ """
+ Test topology builder
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # This function only purpose is to create topology
+ # as defined in input json file.
+ #
+ # Example
+ #
+ # Creating 2 routers having 2 links in between,
+ # one is used to establised BGP neighborship
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(TemplateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # This function only purpose is to create configuration
+ # as defined in input json file.
+ #
+ # Example
+ #
+ # Creating configuration defined in input JSON
+ # file, example, BGP config, interface config, static routes
+ # config, prefix list config
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+
+def test_bgp_convergence(request):
+ " Test BGP daemon convergence "
+
+ tgen = get_topogen()
+ global bgp_convergence
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ bgp_convergence = verify_bgp_convergence(tgen, topo)
+ assert bgp_convergence is True, "test_bgp_convergence failed.. \n" \
+ " Error: {}".format(bgp_convergence)
+
+ logger.info("BGP is converged successfully \n")
+ write_test_footer(tc_name)
+
+
+def test_static_routes(request):
+ " Test to create and verify static routes. "
+
+ tgen = get_topogen()
+ if bgp_convergence is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Static routes are created as part of initial configuration,
+ # verifying RIB
+ dut = 'r3'
+ protocol = 'bgp'
+ next_hop = '10.0.0.1'
+ input_dict = {"r1": topo["routers"]["r1"]}
+
+ # Uncomment below to debug
+ # tgen.mininet_cli()
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == '__main__':
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py b/tests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_single_link/__init__.py
diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json b/tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json
new file mode 100644
index 0000000000..629d2d6d78
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_single_link/example_topojson.json
@@ -0,0 +1,153 @@
+{
+ "ipv4base": "10.0.0.0",
+ "ipv4mask": 30,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "10.0.0.0",
+ "v4mask": 30,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:DB8:F::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {
+ "redist_type": "static"
+ }
+ ],
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "100.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.1"
+ }
+ ]
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r1": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "10.0.0.1/30",
+ "next_hop": "10.0.0.5"
+ }
+ ]
+ }
+ }
+}
diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py b/tests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py
new file mode 100755
index 0000000000..315c7b3f2d
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_single_link/test_example_topojson.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
+# in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+<example>.py: Test <example tests>.
+"""
+
+import os
+import sys
+import time
+import json
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, '../../'))
+
+# pylint: disable=C0413
+from lib.topogen import Topogen, get_topogen
+
+# Required to instantiate the topology builder class.
+from mininet.topo import Topo
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, verify_rib
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology and configuration creation
+jsonFile = "{}/example_topojson.json".format(CWD)
+
+try:
+ with open(jsonFile, 'r') as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+bgp_convergence = False
+input_dict = {}
+
+class TemplateTopo(Topo):
+ """
+ Test topology builder
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # This function only purpose is to create topology
+ # as defined in input json file.
+ #
+ # Example
+ #
+ # Creating 2 routers having single links in between,
+ # which is used to establised BGP neighborship
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("="*40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(TemplateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # This function only purpose is to create configuration
+ # as defined in input json file.
+ #
+ # Example
+ #
+ # Creating configuration defined in input JSON
+ # file, example, BGP config, interface config, static routes
+ # config, prefix list config
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ logger.info("Running setup_module() done")
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+
+def test_bgp_convergence(request):
+ " Test BGP daemon convergence "
+
+ tgen = get_topogen()
+ global bgp_convergence
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ bgp_convergence = verify_bgp_convergence(tgen, topo)
+ assert bgp_convergence is True, "test_bgp_convergence failed.. \n"\
+ " Error: {}".format(bgp_convergence)
+
+ logger.info("BGP is converged successfully \n")
+ write_test_footer(tc_name)
+
+
+def test_static_routes(request):
+ " Test to create and verify static routes. "
+
+ tgen = get_topogen()
+ if bgp_convergence is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Static routes are created as part of initial configuration,
+ # verifying RIB
+ dut = 'r3'
+ next_hop = '10.0.0.1'
+ input_dict = {"r1": topo["routers"]["r1"]}
+
+ # Uncomment below to debug
+ # tgen.mininet_cli()
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == '__main__':
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/__init__.py
diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json
new file mode 100644
index 0000000000..c76c6264be
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/example_topojson.json
@@ -0,0 +1,161 @@
+{
+ "ipv4base": "10.0.0.0",
+ "ipv4mask": 30,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "10.0.0.0",
+ "v4mask": 30,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:DB8:F::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "lo": {
+ "source_link": "lo"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "1.0.2.17/32",
+ "next_hop": "10.0.0.2"
+ }
+ ]
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {
+ "redist_type": "static"
+ }
+ ],
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "lo": {
+ "source_link": "lo"
+ }
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "lo": {
+ "source_link": "lo"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "100.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.1"
+ },
+ {
+ "network": "1.0.1.17/32",
+ "next_hop": "10.0.0.1"
+ },
+ {
+ "network": "1.0.3.17/32",
+ "next_hop": "10.0.0.6"
+ }
+ ]
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "lo": {
+ "source_link": "lo"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "static_routes": [
+ {
+ "network": "1.0.2.17/32",
+ "next_hop": "10.0.0.5"
+ },
+ {
+ "network": "10.0.0.1/30",
+ "next_hop": "10.0.0.5"
+ }
+ ]
+ }
+ }
+}
diff --git a/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py
new file mode 100755
index 0000000000..b794b96a63
--- /dev/null
+++ b/tests/topotests/example-topojson-test/test_topo_json_single_link_loopback/test_example_topojson.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+<example>.py: Test <example tests>.
+"""
+
+import os
+import sys
+import time
+import json
+import inspect
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, '../../'))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+
+# Required to instantiate the topology builder class.
+from mininet.topo import Topo
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, verify_rib
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology and configuration creation
+jsonFile = "{}/example_topojson.json".format(CWD)
+
+try:
+ with open(jsonFile, 'r') as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+bgp_convergence = False
+input_dict = {}
+
+
+class TemplateTopo(Topo):
+ """
+ Test topology builder
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ "Build function"
+ tgen = get_topogen(self)
+
+ # This function only purpose is to create topology
+ # as defined in input json file.
+ #
+ # Example
+ #
+ # Creating 2 routers having single links in between,
+ # which is used to establised BGP neighborship
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("="*40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(TemplateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # This function only purpose is to create configuration
+ # as defined in input json file.
+ #
+ # Example
+ #
+ # Creating configuration defined in input JSON
+ # file, example, BGP config, interface config, static routes
+ # config, prefix list config
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+
+def test_bgp_convergence(request):
+ " Test BGP daemon convergence "
+
+ tgen = get_topogen()
+ global bgp_convergence
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ bgp_convergence = verify_bgp_convergence(tgen, topo)
+ assert bgp_convergence is True, "test_bgp_convergence failed.. \n"\
+ " Error: {}".format(bgp_convergence)
+
+ logger.info("BGP is converged successfully \n")
+ write_test_footer(tc_name)
+
+
+def test_static_routes(request):
+ " Test to create and verify static routes. "
+
+ tgen = get_topogen()
+ if bgp_convergence is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Static routes are created as part of initial configuration,
+ # verifying RIB
+ dut = 'r3'
+ next_hop = '10.0.0.1'
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": "100.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.1"
+ }
+ ]
+ }
+ }
+ # Uncomment below to debug
+ # tgen.mininet_cli()
+ result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == '__main__':
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
new file mode 100644
index 0000000000..13f8824976
--- /dev/null
+++ b/tests/topotests/lib/bgp.py
@@ -0,0 +1,1521 @@
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+from copy import deepcopy
+from time import sleep
+import traceback
+import ipaddr
+from lib import topotest
+
+from lib.topolog import logger
+
+# Import common_config to use commomnly used APIs
+from lib.common_config import (create_common_configuration,
+ InvalidCLIError,
+ load_config_to_router,
+ check_address_types,
+ generate_ips,
+ find_interface_with_greater_ip)
+
+BGP_CONVERGENCE_TIMEOUT = 10
+
+
+def create_router_bgp(tgen, topo, input_dict=None, build=False):
+ """
+ API to configure bgp on router
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": "200",
+ "router_id": "22.22.22.22",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"}
+ ],
+ "advertise_networks": [
+ {
+ "network": "20.0.0.0/32",
+ "no_of_network": 10
+ },
+ {
+ "network": "30.0.0.0/32",
+ "no_of_network": 10
+ }
+ ],
+ "neighbor": {
+ "r3": {
+ "keepalivetimer": 60,
+ "holddowntimer": 180,
+ "dest_link": {
+ "r4": {
+ "prefix_lists": [
+ {
+ "name": "pf_list_1",
+ "direction": "in"
+ }
+ ],
+ "route_maps": [
+ {"name": "RMAP_MED_R3",
+ "direction": "in"}
+ ],
+ "next_hop_self": True
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+ Returns
+ -------
+ True or False
+ """
+ logger.debug("Entering lib API: create_router_bgp()")
+ result = False
+ if not input_dict:
+ input_dict = deepcopy(topo)
+ else:
+ topo = topo["routers"]
+ for router in input_dict.keys():
+ if "bgp" not in input_dict[router]:
+ logger.debug("Router %s: 'bgp' not present in input_dict", router)
+ continue
+
+ result = __create_bgp_global(tgen, input_dict, router, build)
+ if result is True:
+ bgp_data = input_dict[router]["bgp"]
+
+ bgp_addr_data = bgp_data.setdefault("address_family", {})
+
+ if not bgp_addr_data:
+ logger.debug("Router %s: 'address_family' not present in "
+ "input_dict for BGP", router)
+ else:
+
+ ipv4_data = bgp_addr_data.setdefault("ipv4", {})
+ ipv6_data = bgp_addr_data.setdefault("ipv6", {})
+
+ neigh_unicast = True if ipv4_data.setdefault("unicast", {}) \
+ or ipv6_data.setdefault("unicast", {}) else False
+
+ if neigh_unicast:
+ result = __create_bgp_unicast_neighbor(
+ tgen, topo, input_dict, router, build)
+
+ logger.debug("Exiting lib API: create_router_bgp()")
+ return result
+
+
+def __create_bgp_global(tgen, input_dict, router, build=False):
+ """
+ Helper API to create bgp global configuration.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+
+ Returns
+ -------
+ True or False
+ """
+
+ result = False
+ logger.debug("Entering lib API: __create_bgp_global()")
+ try:
+
+ bgp_data = input_dict[router]["bgp"]
+ del_bgp_action = bgp_data.setdefault("delete", False)
+ if del_bgp_action:
+ config_data = ["no router bgp"]
+ result = create_common_configuration(tgen, router, config_data,
+ "bgp", build=build)
+ return result
+
+ config_data = []
+
+ if "local_as" not in bgp_data and build:
+ logger.error("Router %s: 'local_as' not present in input_dict"
+ "for BGP", router)
+ return False
+
+ local_as = bgp_data.setdefault("local_as", "")
+ cmd = "router bgp {}".format(local_as)
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {}".format(cmd, vrf_id)
+
+ config_data.append(cmd)
+
+ router_id = bgp_data.setdefault("router_id", None)
+ del_router_id = bgp_data.setdefault("del_router_id", False)
+ if del_router_id:
+ config_data.append("no bgp router-id")
+ if router_id:
+ config_data.append("bgp router-id {}".format(
+ router_id))
+
+ aggregate_address = bgp_data.setdefault("aggregate_address",
+ {})
+ if aggregate_address:
+ network = aggregate_address.setdefault("network", None)
+ if not network:
+ logger.error("Router %s: 'network' not present in "
+ "input_dict for BGP", router)
+ else:
+ cmd = "aggregate-address {}".format(network)
+
+ as_set = aggregate_address.setdefault("as_set", False)
+ summary = aggregate_address.setdefault("summary", False)
+ del_action = aggregate_address.setdefault("delete", False)
+ if as_set:
+ cmd = "{} {}".format(cmd, "as-set")
+ if summary:
+ cmd = "{} {}".format(cmd, "summary")
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ result = create_common_configuration(tgen, router, config_data,
+ "bgp", build=build)
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_bgp_global()")
+ return result
+
+
+def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, build=False):
+ """
+ Helper API to create configuration for address-family unicast
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+ """
+
+ result = False
+ logger.debug("Entering lib API: __create_bgp_unicast_neighbor()")
+ try:
+ config_data = ["router bgp"]
+ bgp_data = input_dict[router]["bgp"]["address_family"]
+
+ for addr_type, addr_dict in bgp_data.iteritems():
+ if not addr_dict:
+ continue
+
+ if not check_address_types(addr_type):
+ continue
+
+ config_data.append("address-family {} unicast".format(
+ addr_type
+ ))
+ addr_data = addr_dict["unicast"]
+ advertise_network = addr_data.setdefault("advertise_networks",
+ [])
+ for advertise_network_dict in advertise_network:
+ network = advertise_network_dict["network"]
+ if type(network) is not list:
+ network = [network]
+
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ del_action = advertise_network_dict.setdefault("delete",
+ False)
+
+ # Generating IPs for verification
+ prefix = str(
+ ipaddr.IPNetwork(unicode(network[0])).prefixlen)
+ network_list = generate_ips(network, no_of_network)
+ for ip in network_list:
+ ip = str(ipaddr.IPNetwork(unicode(ip)).network)
+
+ cmd = "network {}/{}\n".format(ip, prefix)
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ max_paths = addr_data.setdefault("maximum_paths", {})
+ if max_paths:
+ ibgp = max_paths.setdefault("ibgp", None)
+ ebgp = max_paths.setdefault("ebgp", None)
+ if ibgp:
+ config_data.append("maximum-paths ibgp {}".format(
+ ibgp
+ ))
+ if ebgp:
+ config_data.append("maximum-paths {}".format(
+ ebgp
+ ))
+
+ aggregate_address = addr_data.setdefault("aggregate_address",
+ {})
+ if aggregate_address:
+ ip = aggregate_address("network", None)
+ attribute = aggregate_address("attribute", None)
+ if ip:
+ cmd = "aggregate-address {}".format(ip)
+ if attribute:
+ cmd = "{} {}".format(cmd, attribute)
+
+ config_data.append(cmd)
+
+ redistribute_data = addr_data.setdefault("redistribute", {})
+ if redistribute_data:
+ for redistribute in redistribute_data:
+ if "redist_type" not in redistribute:
+ logger.error("Router %s: 'redist_type' not present in "
+ "input_dict", router)
+ else:
+ cmd = "redistribute {}".format(
+ redistribute["redist_type"])
+ redist_attr = redistribute.setdefault("attribute",
+ None)
+ if redist_attr:
+ cmd = "{} {}".format(cmd, redist_attr)
+ del_action = redistribute.setdefault("delete", False)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if "neighbor" in addr_data:
+ neigh_data = __create_bgp_neighbor(topo, input_dict,
+ router, addr_type)
+ config_data.extend(neigh_data)
+
+ for addr_type, addr_dict in bgp_data.iteritems():
+ if not addr_dict or not check_address_types(addr_type):
+ continue
+
+ addr_data = addr_dict["unicast"]
+ if "neighbor" in addr_data:
+ neigh_addr_data = __create_bgp_unicast_address_family(
+ topo, input_dict, router, addr_type)
+
+ config_data.extend(neigh_addr_data)
+
+ result = create_common_configuration(tgen, router, config_data,
+ None, build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
+ return result
+
+
+def __create_bgp_neighbor(topo, input_dict, router, addr_type):
+ """
+ Helper API to create neighbor specific configuration
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `router` : router id to be configured
+ """
+
+ config_data = []
+ logger.debug("Entering lib API: __create_bgp_neighbor()")
+
+ bgp_data = input_dict[router]["bgp"]["address_family"]
+ neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
+
+ for name, peer_dict in neigh_data.iteritems():
+ for dest_link, peer in peer_dict["dest_link"].iteritems():
+ nh_details = topo[name]
+ remote_as = nh_details["bgp"]["local_as"]
+ update_source = None
+
+ if dest_link in nh_details["links"].keys():
+ ip_addr = \
+ nh_details["links"][dest_link][addr_type].split("/")[0]
+ # Loopback interface
+ if "source_link" in peer and peer["source_link"] == "lo":
+ update_source = topo[router]["links"]["lo"][
+ addr_type].split("/")[0]
+
+ neigh_cxt = "neighbor {}".format(ip_addr)
+
+ config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
+ if addr_type == "ipv6":
+ config_data.append("address-family ipv6 unicast")
+ config_data.append("{} activate".format(neigh_cxt))
+
+ disable_connected = peer.setdefault("disable_connected_check",
+ False)
+ keep_alive = peer.setdefault("keep_alive", 60)
+ hold_down = peer.setdefault("hold_down", 180)
+ password = peer.setdefault("password", None)
+ max_hop_limit = peer.setdefault("ebgp_multihop", 1)
+
+ if update_source:
+ config_data.append("{} update-source {}".format(
+ neigh_cxt, update_source))
+ if disable_connected:
+ config_data.append("{} disable-connected-check".format(
+ disable_connected))
+ if update_source:
+ config_data.append("{} update-source {}".format(neigh_cxt,
+ update_source))
+ if int(keep_alive) != 60 and int(hold_down) != 180:
+ config_data.append(
+ "{} timers {} {}".format(neigh_cxt, keep_alive,
+ hold_down))
+ if password:
+ config_data.append(
+ "{} password {}".format(neigh_cxt, password))
+
+ if max_hop_limit > 1:
+ config_data.append("{} ebgp-multihop {}".format(neigh_cxt,
+ max_hop_limit))
+ config_data.append("{} enforce-multihop".format(neigh_cxt))
+
+ logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
+ return config_data
+
+
+def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type):
+ """
+ API prints bgp global config to bgp_json file.
+
+ Parameters
+ ----------
+ * `bgp_cfg` : BGP class variables have BGP config saved in it for
+ particular router,
+ * `local_as_no` : Local as number
+ * `router_id` : Router-id
+ * `ecmp_path` : ECMP max path
+ * `gr_enable` : BGP global gracefull restart config
+ """
+
+ config_data = []
+ logger.debug("Entering lib API: __create_bgp_unicast_neighbor()")
+
+ bgp_data = input_dict[router]["bgp"]["address_family"]
+ neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
+
+ for name, peer_dict in deepcopy(neigh_data).iteritems():
+ for dest_link, peer in peer_dict["dest_link"].iteritems():
+ deactivate = None
+ nh_details = topo[name]
+ # Loopback interface
+ if "source_link" in peer and peer["source_link"] == "lo":
+ for destRouterLink, data in sorted(nh_details["links"].
+ iteritems()):
+ if "type" in data and data["type"] == "loopback":
+ if dest_link == destRouterLink:
+ ip_addr = \
+ nh_details["links"][destRouterLink][
+ addr_type].split("/")[0]
+
+ # Physical interface
+ else:
+ if dest_link in nh_details["links"].keys():
+
+ ip_addr = nh_details["links"][dest_link][
+ addr_type].split("/")[0]
+ if addr_type == "ipv4" and bgp_data["ipv6"]:
+ deactivate = nh_details["links"][
+ dest_link]["ipv6"].split("/")[0]
+
+ neigh_cxt = "neighbor {}".format(ip_addr)
+ config_data.append("address-family {} unicast".format(
+ addr_type
+ ))
+ if deactivate:
+ config_data.append(
+ "no neighbor {} activate".format(deactivate))
+
+ next_hop_self = peer.setdefault("next_hop_self", None)
+ send_community = peer.setdefault("send_community", None)
+ prefix_lists = peer.setdefault("prefix_lists", {})
+ route_maps = peer.setdefault("route_maps", {})
+
+ # next-hop-self
+ if next_hop_self:
+ config_data.append("{} next-hop-self".format(neigh_cxt))
+ # no_send_community
+ if send_community:
+ config_data.append("{} send-community".format(neigh_cxt))
+
+ if prefix_lists:
+ for prefix_list in prefix_lists:
+ name = prefix_list.setdefault("name", {})
+ direction = prefix_list.setdefault("direction", "in")
+ del_action = prefix_list.setdefault("delete", False)
+ if not name:
+ logger.info("Router %s: 'name' not present in "
+ "input_dict for BGP neighbor prefix lists",
+ router)
+ else:
+ cmd = "{} prefix-list {} {}".format(neigh_cxt, name,
+ direction)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ if route_maps:
+ for route_map in route_maps:
+ name = route_map.setdefault("name", {})
+ direction = route_map.setdefault("direction", "in")
+ del_action = route_map.setdefault("delete", False)
+ if not name:
+ logger.info("Router %s: 'name' not present in "
+ "input_dict for BGP neighbor route name",
+ router)
+ else:
+ cmd = "{} route-map {} {}".format(neigh_cxt, name,
+ direction)
+ if del_action:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
+ return config_data
+
+
+#############################################
+# Verification APIs
+#############################################
+def verify_router_id(tgen, topo, input_dict):
+ """
+ Running command "show ip bgp json" for DUT and reading router-id
+ from input_dict and verifying with command output.
+ 1. Statically modfified router-id should take place
+ 2. When static router-id is deleted highest loopback should
+ become router-id
+ 3. When loopback intf is down then highest physcial intf
+ should become router-id
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `input_dict`: input dictionary, have details of Device Under Test, for
+ which user wants to test the data
+ Usage
+ -----
+ # Verify if router-id for r1 is 12.12.12.12
+ input_dict = {
+ "r1":{
+ "router_id": "12.12.12.12"
+ }
+ # Verify that router-id for r1 is highest interface ip
+ input_dict = {
+ "routers": ["r1"]
+ }
+ result = verify_router_id(tgen, topo, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_router_id()")
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ del_router_id = input_dict[router]["bgp"].setdefault(
+ "del_router_id", False)
+
+ logger.info("Checking router %s router-id", router)
+ show_bgp_json = rnode.vtysh_cmd("show ip bgp json",
+ isjson=True)
+ router_id_out = show_bgp_json["routerId"]
+ router_id_out = ipaddr.IPv4Address(unicode(router_id_out))
+
+ # Once router-id is deleted, highest interface ip should become
+ # router-id
+ if del_router_id:
+ router_id = find_interface_with_greater_ip(topo, router)
+ else:
+ router_id = input_dict[router]["bgp"]["router_id"]
+ router_id = ipaddr.IPv4Address(unicode(router_id))
+
+ if router_id == router_id_out:
+ logger.info("Found expected router-id %s for router %s",
+ router_id, router)
+ else:
+ errormsg = "Router-id for router:{} mismatch, expected:" \
+ " {} but found:{}".format(router, router_id,
+ router_id_out)
+ return errormsg
+
+ logger.info("Exiting lib API: verify_router_id()")
+ return True
+
+
+def verify_bgp_convergence(tgen, topo):
+ """
+ API will verify if BGP is converged with in the given time frame.
+ Running "show bgp summary json" command and verify bgp neighbor
+ state is established,
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type`: ip_type, ipv4/ipv6
+
+ Usage
+ -----
+ # To veriry is BGP is converged for all the routers used in
+ topology
+ results = verify_bgp_convergence(tgen, topo, "ipv4")
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_confergence()")
+ for router, rnode in tgen.routers().iteritems():
+ logger.info("Verifying BGP Convergence on router %s:", router)
+
+ for retry in range(1, 11):
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ total_peer = 0
+
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type.keys():
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ no_of_peer = 0
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link in peer_data["dest_link"].keys():
+ data = topo["routers"][bgp_neighbor]["links"]
+ if dest_link in data:
+ neighbor_ip = \
+ data[dest_link][addr_type].split("/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s", router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s",
+ router)
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on"
+ " router %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ show_bgp_summary = rnode.vtysh_cmd("show bgp summary")
+ errormsg = "TIMEOUT!! BGP is not converged in {} " \
+ "seconds for router {} \n {}".format(
+ BGP_CONVERGENCE_TIMEOUT, router,
+ show_bgp_summary)
+ return errormsg
+
+ logger.info("Exiting API: verify_bgp_confergence()")
+ return True
+
+
+def modify_as_number(tgen, topo, input_dict):
+ """
+ API reads local_as and remote_as from user defined input_dict and
+ modify router"s ASNs accordingly. Router"s config is modified and
+ recent/changed config is loadeded to router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : defines for which router ASNs needs to be modified
+
+ Usage
+ -----
+ To modify ASNs for router r1
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: modify_as_number()")
+ try:
+
+ new_topo = deepcopy(topo["routers"])
+ router_dict = {}
+ for router in input_dict.keys():
+ # Remove bgp configuration
+
+ router_dict.update({
+ router: {
+ "bgp": {
+ "delete": True
+ }
+ }
+ })
+
+ new_topo[router]["bgp"]["local_as"] = \
+ input_dict[router]["bgp"]["local_as"]
+
+ logger.info("Removing bgp configuration")
+ create_router_bgp(tgen, topo, router_dict)
+
+ logger.info("Applying modified bgp configuration")
+ create_router_bgp(tgen, new_topo)
+
+ except Exception as e:
+ # handle any exception
+ logger.error("Error %s occured. Arguments %s.", e.message, e.args)
+
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.info("Exiting lib API: modify_as_number()")
+
+ return True
+
+
+def verify_as_numbers(tgen, topo, input_dict):
+ """
+ This API is to verify AS numbers for given DUT by running
+ "show ip bgp neighbor json" command. Local AS and Remote AS
+ will ve verified with input_dict data and command output.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type, ipv4/ipv6
+ * `input_dict`: defines - for which router, AS numbers needs to be verified
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ }
+ }
+ result = verify_as_numbers(tgen, topo, addr_type, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_as_numbers()")
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ logger.info("Verifying AS numbers for dut %s:", router)
+
+ show_ip_bgp_neighbor_json = rnode.vtysh_cmd(
+ "show ip bgp neighbor json", isjson=True)
+ local_as = input_dict[router]["bgp"]["local_as"]
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
+ "neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ neighbor_ip = None
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
+ # Verify Local AS for router
+ if neigh_data["localAs"] != local_as:
+ errormsg = "Failed: Verify local_as for dut {}," \
+ " found: {} but expected: {}".format(
+ router, neigh_data["localAs"],
+ local_as)
+ return errormsg
+ else:
+ logger.info("Verified local_as for dut %s, found"
+ " expected: %s", router, local_as)
+
+ # Verify Remote AS for neighbor
+ if neigh_data["remoteAs"] != remote_as:
+ errormsg = "Failed: Verify remote_as for dut " \
+ "{}'s neighbor {}, found: {} but " \
+ "expected: {}".format(
+ router, bgp_neighbor,
+ neigh_data["remoteAs"], remote_as)
+ return errormsg
+ else:
+ logger.info("Verified remote_as for dut %s's "
+ "neighbor %s, found expected: %s",
+ router, bgp_neighbor, remote_as)
+
+ logger.info("Exiting lib API: verify_AS_numbers()")
+ return True
+
+
+def clear_bgp_and_verify(tgen, topo, router):
+ """
+ This API is to clear bgp neighborship and verify bgp neighborship
+ is coming up(BGP is converged) usinf "show bgp summary json" command
+ and also verifying for all bgp neighbors uptime before and after
+ clear bgp sessions is different as the uptime must be changed once
+ bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `router`: device under test
+
+ Usage
+ -----
+ result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: clear_bgp_and_verify()")
+
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ peer_uptime_before_clear_bgp = {}
+ # Verifying BGP convergence before bgp clear command
+ for retry in range(1, 11):
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on router"
+ " %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
+ " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
+ return errormsg
+
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ logger.info(show_bgp_json)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ total_peer = 0
+ for addr_type in bgp_addr_type.keys():
+
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ no_of_peer = 0
+ for addr_type in bgp_addr_type:
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split("/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+
+ # Peer up time dictionary
+ peer_uptime_before_clear_bgp[bgp_neighbor] = \
+ ipv4_data[neighbor_ip]["peerUptime"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ # Peer up time dictionary
+ peer_uptime_before_clear_bgp[bgp_neighbor] = \
+ ipv6_data[neighbor_ip]["peerUptime"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s before bgp"
+ " clear", router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s "
+ "before bgp clear", router)
+
+ # Clearing BGP
+ logger.info("Clearing BGP neighborship for router %s..", router)
+ for addr_type in bgp_addr_type.keys():
+ if addr_type == "ipv4":
+ rnode.vtysh_cmd("clear ip bgp *")
+ elif addr_type == "ipv6":
+ rnode.vtysh_cmd("clear bgp ipv6 *")
+
+ peer_uptime_after_clear_bgp = {}
+ # Verifying BGP convergence after bgp clear command
+ for retry in range(1, 11):
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on router"
+ " %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
+ " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
+ return errormsg
+
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ total_peer = 0
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ no_of_peer = 0
+ for addr_type in bgp_addr_type:
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].\
+ split("/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = \
+ ipv4_data[neighbor_ip]["peerUptime"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+ # Peer up time dictionary
+ peer_uptime_after_clear_bgp[bgp_neighbor] = \
+ ipv6_data[neighbor_ip]["peerUptime"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s after bgp clear",
+ router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s after"
+ " bgp clear", router)
+
+ # Compariung peerUptime dictionaries
+ if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
+ logger.info("BGP neighborship is reset after clear BGP on router %s",
+ router)
+ else:
+ errormsg = "BGP neighborship is not reset after clear bgp on router" \
+ " {}".format(router)
+ return errormsg
+
+ logger.info("Exiting lib API: clear_bgp_and_verify()")
+ return True
+
+
+def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
+ """
+ To verify BGP timer config, execute "show ip bgp neighbor json" command
+ and verify bgp timers with input_dict data.
+ To veirfy bgp timers functonality, shutting down peer interface
+ and verify BGP neighborship status.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type`: ip type, ipv4/ipv6
+ * `input_dict`: defines for which router, bgp timers needs to be verified
+
+ Usage:
+ # To verify BGP timers for neighbor r2 of router r1
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "bgp_neighbors":{
+ "r2":{
+ "keepalivetimer": 5,
+ "holddowntimer": 15,
+ }}}}}
+ result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
+ input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_timers_and_functionality()")
+ sleep(5)
+ router_list = tgen.routers()
+ for router in input_dict.keys():
+ if router not in router_list:
+ continue
+
+ rnode = router_list[router]
+
+ logger.info("Verifying bgp timers functionality, DUT is %s:",
+ router)
+
+ show_ip_bgp_neighbor_json = \
+ rnode.vtysh_cmd("show ip bgp neighbor json", isjson=True)
+
+ bgp_addr_type = input_dict[router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
+ "neighbor"]
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ keepalivetimer = peer_dict["keepalivetimer"]
+ holddowntimer = peer_dict["holddowntimer"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ neighbor_intf = data[dest_link]["interface"]
+
+ # Verify HoldDownTimer for neighbor
+ bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[
+ neighbor_ip]["bgpTimerHoldTimeMsecs"]
+ if bgpHoldTimeMsecs != holddowntimer * 1000:
+ errormsg = "Verifying holddowntimer for bgp " \
+ "neighbor {} under dut {}, found: {} " \
+ "but expected: {}".format(
+ neighbor_ip, router,
+ bgpHoldTimeMsecs,
+ holddowntimer * 1000)
+ return errormsg
+
+ # Verify KeepAliveTimer for neighbor
+ bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[
+ neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"]
+ if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
+ errormsg = "Verifying keepalivetimer for bgp " \
+ "neighbor {} under dut {}, found: {} " \
+ "but expected: {}".format(
+ neighbor_ip, router,
+ bgpKeepAliveTimeMsecs,
+ keepalivetimer * 1000)
+ return errormsg
+
+ ####################
+ # Shutting down peer interface after keepalive time and
+ # after some time bringing up peer interface.
+ # verifying BGP neighborship in (hold down-keep alive)
+ # time, it should not go down
+ ####################
+
+ # Wait till keep alive time
+ logger.info("=" * 20)
+ logger.info("Scenario 1:")
+ logger.info("Shutdown and bring up peer interface: %s "
+ "in keep alive time : %s sec and verify "
+ " BGP neighborship is intact in %s sec ",
+ neighbor_intf, keepalivetimer,
+ (holddowntimer - keepalivetimer))
+ logger.info("=" * 20)
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+
+ # Shutting down peer ineterface
+ logger.info("Shutting down interface %s on router %s",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf,
+ ifaceaction=False)
+
+ # Bringing up peer interface
+ sleep(5)
+ logger.info("Bringing up interface %s on router %s..",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf,
+ ifaceaction=True)
+
+ # Verifying BGP neighborship is intact in
+ # (holddown - keepalive) time
+ for timer in range(keepalivetimer, holddowntimer,
+ int(holddowntimer / 3)):
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+ sleep(2)
+ show_bgp_json = \
+ rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if timer == \
+ (holddowntimer - keepalivetimer):
+ if nh_state != "Established":
+ errormsg = "BGP neighborship has not gone " \
+ "down in {} sec for neighbor {}\n" \
+ "show_bgp_json: \n {} ".format(
+ timer, bgp_neighbor,
+ show_bgp_json)
+ return errormsg
+ else:
+ logger.info("BGP neighborship is intact in %s"
+ " sec for neighbor %s \n "
+ "show_bgp_json : \n %s",
+ timer, bgp_neighbor,
+ show_bgp_json)
+
+ ####################
+ # Shutting down peer interface and verifying that BGP
+ # neighborship is going down in holddown time
+ ####################
+ logger.info("=" * 20)
+ logger.info("Scenario 2:")
+ logger.info("Shutdown peer interface: %s and verify BGP"
+ " neighborship has gone down in hold down "
+ "time %s sec", neighbor_intf, holddowntimer)
+ logger.info("=" * 20)
+
+ logger.info("Shutting down interface %s on router %s..",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(router_list[bgp_neighbor],
+ neighbor_intf,
+ ifaceaction=False)
+
+ # Verifying BGP neighborship is going down in holddown time
+ for timer in range(keepalivetimer,
+ (holddowntimer + keepalivetimer),
+ int(holddowntimer / 3)):
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+ sleep(2)
+ show_bgp_json = \
+ rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if timer == holddowntimer:
+ if nh_state == "Established":
+ errormsg = "BGP neighborship has not gone " \
+ "down in {} sec for neighbor {}\n" \
+ "show_bgp_json: \n {} ".format(
+ timer, bgp_neighbor,
+ show_bgp_json)
+ return errormsg
+ else:
+ logger.info("BGP neighborship has gone down in"
+ " %s sec for neighbor %s \n"
+ "show_bgp_json : \n %s",
+ timer, bgp_neighbor,
+ show_bgp_json)
+
+ logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
+ return True
+
+
+def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
+ attribute):
+ """
+ API is to verify best path according to BGP attributes for given routes.
+ "show bgp ipv4/6 json" command will be run and verify best path according
+ to shortest as-path, highest local-preference and med, lowest weight and
+ route origin IGP>EGP>INCOMPLETE.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `tgen` : topogen object
+ * `attribute` : calculate best path using this attribute
+ * `input_dict`: defines different routes to calculate for which route
+ best path is selected
+
+ Usage
+ -----
+ # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
+ router r7 to router r1(DUT) as per shortest as-path attribute
+ input_dict = {
+ "r7": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "advertise_networks": [
+ {
+ "network": "200.50.2.0/32"
+ },
+ {
+ "network": "200.60.2.0/32"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ attribute = "localpref"
+ result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
+ input_dict, attribute)
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_best_path_as_per_bgp_attribute()")
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ # TODO get addr_type from address
+ # Verifying show bgp json
+ command = "show bgp {} json".format(addr_type)
+
+ sleep(2)
+ logger.info("Verifying router %s RIB for best path:", router)
+ sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True)
+
+ for route_val in input_dict.values():
+ net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"]
+ networks = net_data["advertise_networks"]
+ for network in networks:
+ route = network["network"]
+
+ route_attributes = sh_ip_bgp_json["routes"][route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute[attribute]
+
+ # AS_PATH attribute
+ if attribute == "aspath":
+ # Find next_hop for the route have minimum as_path
+ _next_hop = min(attribute_dict, key=lambda x: len(set(
+ attribute_dict[x])))
+ compare = "SHORTEST"
+
+ # LOCAL_PREF attribute
+ elif attribute == "localpref":
+ # Find next_hop for the route have highest local preference
+ _next_hop = max(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "HIGHEST"
+
+ # WEIGHT attribute
+ elif attribute == "weight":
+ # Find next_hop for the route have highest weight
+ _next_hop = max(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "HIGHEST"
+
+ # ORIGIN attribute
+ elif attribute == "origin":
+ # Find next_hop for the route have IGP as origin, -
+ # - rule is IGP>EGP>INCOMPLETE
+ _next_hop = [key for (key, value) in
+ attribute_dict.iteritems()
+ if value == "IGP"][0]
+ compare = ""
+
+ # MED attribute
+ elif attribute == "med":
+ # Find next_hop for the route have LOWEST MED
+ _next_hop = min(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "LOWEST"
+
+ # Show ip route
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..". \
+ format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
+ _next_hop:
+ nh_found = True
+ else:
+ errormsg = "Incorrect Nexthop for BGP route {} in " \
+ "RIB of router {}, Expected: {}, Found:" \
+ " {}\n".format(route, router,
+ rib_routes_json[route][0][
+ "nexthops"][0]["ip"],
+ _next_hop)
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info(
+ "Best path for prefix: %s with next_hop: %s is "
+ "installed according to %s %s: (%s) in RIB of "
+ "router %s", route, _next_hop, compare,
+ attribute, attribute_dict[_next_hop], router)
+
+ logger.debug("Exiting lib API: verify_best_path_as_per_bgp_attribute()")
+ return True
+
+
+def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
+ attribute):
+ """
+ API is to verify best path according to admin distance for given
+ route. "show ip/ipv6 route json" command will be run and verify
+ best path accoring to shortest admin distanc.
+
+ Parameters
+ ----------
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `tgen` : topogen object
+ * `attribute` : calculate best path using admin distance
+ * `input_dict`: defines different routes with different admin distance
+ to calculate for which route best path is selected
+ Usage
+ -----
+ # To verify best path for route 200.50.2.0/32 from router r2 to
+ router r1(DUT) as per shortest admin distance which is 60.
+ input_dict = {
+ "r2": {
+ "static_routes": [{"network": "200.50.2.0/32", \
+ "admin_distance": 80, "next_hop": "10.0.0.14"},
+ {"network": "200.50.2.0/32", \
+ "admin_distance": 60, "next_hop": "10.0.0.18"}]
+ }}
+ attribute = "localpref"
+ result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
+ input_dict, attribute):
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_best_path_as_per_admin_distance()")
+ router_list = tgen.routers()
+ if router not in router_list:
+ return False
+
+ rnode = tgen.routers()[router]
+
+ sleep(2)
+ logger.info("Verifying router %s RIB for best path:", router)
+
+ # Show ip route cmd
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+
+ for routes_from_router in input_dict.keys():
+ sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
+ command, isjson=True)
+ networks = input_dict[routes_from_router]["static_routes"]
+ for network in networks:
+ route = network["network"]
+
+ route_attributes = sh_ip_route_json[route]
+ _next_hop = None
+ compare = None
+ attribute_dict = {}
+ for route_attribute in route_attributes:
+ next_hops = route_attribute["nexthops"]
+ for next_hop in next_hops:
+ next_hop_ip = next_hop["ip"]
+ attribute_dict[next_hop_ip] = route_attribute["distance"]
+
+ # Find next_hop for the route have LOWEST Admin Distance
+ _next_hop = min(attribute_dict, key=(lambda k:
+ attribute_dict[k]))
+ compare = "LOWEST"
+
+ # Show ip route
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if not bool(rib_routes_json):
+ errormsg = "No route found in RIB of router {}..".format(router)
+ return errormsg
+
+ st_found = False
+ nh_found = False
+ # Find best is installed in RIB
+ if route in rib_routes_json:
+ st_found = True
+ # Verify next_hop in rib_routes_json
+ if rib_routes_json[route][0]["nexthops"][0]["ip"] == \
+ _next_hop:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for BGP route {}"
+ " in RIB of router {}\n".format(_next_hop,
+ route, router))
+ return errormsg
+
+ if st_found and nh_found:
+ logger.info("Best path for prefix: %s is installed according"
+ " to %s %s: (%s) in RIB of router %s", route,
+ compare, attribute,
+ attribute_dict[_next_hop], router)
+
+ logger.info(
+ "Exiting lib API: verify_best_path_as_per_admin_distance()")
+ return True
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
new file mode 100644
index 0000000000..d2c1d82430
--- /dev/null
+++ b/tests/topotests/lib/common_config.py
@@ -0,0 +1,1391 @@
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
+# ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+from collections import OrderedDict
+from datetime import datetime
+from time import sleep
+from subprocess import call
+from subprocess import STDOUT as SUB_STDOUT
+import StringIO
+import os
+import ConfigParser
+import traceback
+import socket
+import ipaddr
+
+from lib.topolog import logger, logger_config
+from lib.topogen import TopoRouter
+
+
+FRRCFG_FILE = "frr_json.conf"
+FRRCFG_BKUP_FILE = "frr_json_initial.conf"
+
+ERROR_LIST = ["Malformed", "Failure", "Unknown"]
+
+####
+CD = os.path.dirname(os.path.realpath(__file__))
+PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
+
+# Creating tmp dir with testsuite name to avoid conflict condition when
+# multiple testsuites run together. All temporary files would be created
+# in this dir and this dir would be removed once testsuite run is
+# completed
+LOGDIR = "/tmp/topotests/"
+TMPDIR = None
+
+# NOTE: to save execution logs to log file frrtest_log_dir must be configured
+# in `pytest.ini`.
+config = ConfigParser.ConfigParser()
+config.read(PYTESTINI_PATH)
+
+config_section = "topogen"
+
+if config.has_option("topogen", "verbosity"):
+ loglevel = config.get("topogen", "verbosity")
+ loglevel = loglevel.upper()
+else:
+ loglevel = "INFO"
+
+if config.has_option("topogen", "frrtest_log_dir"):
+ frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
+ time_stamp = datetime.time(datetime.now())
+ logfile_name = "frr_test_bgp_"
+ frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
+ print("frrtest_log_file..", frrtest_log_file)
+
+ logger = logger_config.get_logger(name="test_execution_logs",
+ log_level=loglevel,
+ target=frrtest_log_file)
+ print("Logs will be sent to logfile: {}".format(frrtest_log_file))
+
+if config.has_option("topogen", "show_router_config"):
+ show_router_config = config.get("topogen", "show_router_config")
+else:
+ show_router_config = False
+
+# env variable for setting what address type to test
+ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
+
+
+# Saves sequence id numbers
+SEQ_ID = {
+ "prefix_lists": {},
+ "route_maps": {}
+}
+
+
+def get_seq_id(obj_type, router, obj_name):
+ """
+ Generates and saves sequence number in interval of 10
+
+ Parameters
+ ----------
+ * `obj_type`: prefix_lists or route_maps
+ * `router`: router name
+ *` obj_name`: name of the prefix-list or route-map
+
+ Returns
+ --------
+ Sequence number generated
+ """
+
+ router_data = SEQ_ID[obj_type].setdefault(router, {})
+ obj_data = router_data.setdefault(obj_name, {})
+ seq_id = obj_data.setdefault("seq_id", 0)
+
+ seq_id = int(seq_id) + 10
+ obj_data["seq_id"] = seq_id
+
+ return seq_id
+
+
+def set_seq_id(obj_type, router, id, obj_name):
+ """
+ Saves sequence number if not auto-generated and given by user
+
+ Parameters
+ ----------
+ * `obj_type`: prefix_lists or route_maps
+ * `router`: router name
+ *` obj_name`: name of the prefix-list or route-map
+ """
+ router_data = SEQ_ID[obj_type].setdefault(router, {})
+ obj_data = router_data.setdefault(obj_name, {})
+ seq_id = obj_data.setdefault("seq_id", 0)
+
+ seq_id = int(seq_id) + int(id)
+ obj_data["seq_id"] = seq_id
+
+
+class InvalidCLIError(Exception):
+ """Raise when the CLI command is wrong"""
+ pass
+
+
+def create_common_configuration(tgen, router, data, config_type=None,
+ build=False):
+ """
+ API to create object of class FRRConfig and also create frr_json.conf
+ file. It will create interface and common configurations and save it to
+ frr_json.conf and load to router
+
+ Parameters
+ ----------
+ * `tgen`: tgen onject
+ * `data`: Congiguration data saved in a list.
+ * `router` : router id to be configured.
+ * `config_type` : Syntactic information while writing configuration. Should
+ be one of the value as mentioned in the config_map below.
+ * `build` : Only for initial setup phase this is set as True
+
+ Returns
+ -------
+ True or False
+ """
+ TMPDIR = os.path.join(LOGDIR, tgen.modname)
+
+ fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
+
+ config_map = OrderedDict({
+ "general_config": "! FRR General Config\n",
+ "interface_config": "! Interfaces Config\n",
+ "static_route": "! Static Route Config\n",
+ "prefix_list": "! Prefix List Config\n",
+ "route_maps": "! Route Maps Config\n",
+ "bgp": "! BGP Config\n"
+ })
+
+ if build:
+ mode = "a"
+ else:
+ mode = "w"
+
+ try:
+ frr_cfg_fd = open(fname, mode)
+ if config_type:
+ frr_cfg_fd.write(config_map[config_type])
+ for line in data:
+ frr_cfg_fd.write("{} \n".format(str(line)))
+
+ except IOError as err:
+ logger.error("Unable to open FRR Config File. error(%s): %s" %
+ (err.errno, err.strerror))
+ return False
+ finally:
+ frr_cfg_fd.close()
+
+ # If configuration applied from build, it will done at last
+ if not build:
+ load_config_to_router(tgen, router)
+
+ return True
+
+
+def reset_config_on_routers(tgen, routerName=None):
+ """
+ Resets configuration on routers to the snapshot created using input JSON
+ file. It replaces existing router configuration with FRRCFG_BKUP_FILE
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `routerName` : router config is to be reset
+ """
+
+ logger.debug("Entering API: reset_config_on_routers")
+
+ router_list = tgen.routers()
+ for rname, router in router_list.iteritems():
+ if routerName and routerName != rname:
+ continue
+
+ cfg = router.run("vtysh -c 'show running'")
+ fname = "{}/{}/frr.sav".format(TMPDIR, rname)
+ dname = "{}/{}/delta.conf".format(TMPDIR, rname)
+ f = open(fname, "w")
+ for line in cfg.split("\n"):
+ line = line.strip()
+
+ if (line == "Building configuration..." or
+ line == "Current configuration:" or
+ not line):
+ continue
+ f.write(line)
+ f.write("\n")
+
+ f.close()
+
+ command = "/usr/lib/frr/frr-reload.py --input {}/{}/frr.sav" \
+ " --test {}/{}/frr_json_initial.conf > {}". \
+ format(TMPDIR, rname, TMPDIR, rname, dname)
+ result = call(command, shell=True, stderr=SUB_STDOUT)
+
+ # Assert if command fail
+ if result > 0:
+ errormsg = ("Command:{} is failed due to non-zero exit"
+ " code".format(command))
+ return errormsg
+
+ f = open(dname, "r")
+ delta = StringIO.StringIO()
+ delta.write("configure terminal\n")
+ t_delta = f.read()
+ for line in t_delta.split("\n"):
+ line = line.strip()
+ if (line == "Lines To Delete" or
+ line == "===============" or
+ line == "Lines To Add" or
+ line == "============" or
+ not line):
+ continue
+ delta.write(line)
+ delta.write("\n")
+
+ delta.write("end\n")
+ output = router.vtysh_multicmd(delta.getvalue(),
+ pretty_output=False)
+ logger.info("New configuration for router {}:".format(rname))
+ delta.close()
+ delta = StringIO.StringIO()
+ cfg = router.run("vtysh -c 'show running'")
+ for line in cfg.split("\n"):
+ line = line.strip()
+ delta.write(line)
+ delta.write("\n")
+
+ # Router current configuration to log file or console if
+ # "show_router_config" is defined in "pytest.ini"
+ if show_router_config:
+ logger.info(delta.getvalue())
+ delta.close()
+
+ logger.debug("Exting API: reset_config_on_routers")
+ return True
+
+
+def load_config_to_router(tgen, routerName, save_bkup=False):
+ """
+ Loads configuration on router from the file FRRCFG_FILE.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `routerName` : router for which configuration to be loaded
+ * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
+ """
+
+ logger.debug("Entering API: load_config_to_router")
+
+ router_list = tgen.routers()
+ for rname, router in router_list.iteritems():
+ if rname == routerName:
+ try:
+ frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
+ frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname,
+ FRRCFG_BKUP_FILE)
+ with open(frr_cfg_file, "r") as cfg:
+ data = cfg.read()
+ if save_bkup:
+ with open(frr_cfg_bkup, "w") as bkup:
+ bkup.write(data)
+
+ output = router.vtysh_multicmd(data, pretty_output=False)
+ for out_err in ERROR_LIST:
+ if out_err.lower() in output.lower():
+ raise InvalidCLIError("%s" % output)
+ except IOError as err:
+ errormsg = ("Unable to open config File. error(%s):"
+ " %s", (err.errno, err.strerror))
+ return errormsg
+
+ logger.info("New configuration for router {}:".format(rname))
+ new_config = router.run("vtysh -c 'show running'")
+
+ # Router current configuration to log file or console if
+ # "show_router_config" is defined in "pytest.ini"
+ if show_router_config:
+ logger.info(new_config)
+
+ logger.debug("Exting API: load_config_to_router")
+ return True
+
+
+def start_topology(tgen):
+ """
+ Starting topology, create tmp files which are loaded to routers
+ to start deamons and then start routers
+ * `tgen` : topogen object
+ """
+
+ global TMPDIR
+ # Starting topology
+ tgen.start_topology()
+
+ # Starting deamons
+ router_list = tgen.routers()
+ TMPDIR = os.path.join(LOGDIR, tgen.modname)
+
+ for rname, router in router_list.iteritems():
+ try:
+ os.chdir(TMPDIR)
+
+ # Creating rouer named dir and empty zebra.conf bgpd.conf files
+ # inside the current directory
+
+ if os.path.isdir('{}'.format(rname)):
+ os.system("rm -rf {}".format(rname))
+ os.mkdir('{}'.format(rname))
+ os.system('chmod -R go+rw {}'.format(rname))
+ os.chdir('{}/{}'.format(TMPDIR, rname))
+ os.system('touch zebra.conf bgpd.conf')
+ else:
+ os.mkdir('{}'.format(rname))
+ os.system('chmod -R go+rw {}'.format(rname))
+ os.chdir('{}/{}'.format(TMPDIR, rname))
+ os.system('touch zebra.conf bgpd.conf')
+
+ except IOError as (errno, strerror):
+ logger.error("I/O error({0}): {1}".format(errno, strerror))
+
+ # Loading empty zebra.conf file to router, to start the zebra deamon
+ router.load_config(
+ TopoRouter.RD_ZEBRA,
+ '{}/{}/zebra.conf'.format(TMPDIR, rname)
+ # os.path.join(tmpdir, '{}/zebra.conf'.format(rname))
+ )
+ # Loading empty bgpd.conf file to router, to start the bgp deamon
+ router.load_config(
+ TopoRouter.RD_BGP,
+ '{}/{}/bgpd.conf'.format(TMPDIR, rname)
+ # os.path.join(tmpdir, '{}/bgpd.conf'.format(rname))
+ )
+
+ # Starting routers
+ logger.info("Starting all routers once topology is created")
+ tgen.start_router()
+
+
+def number_to_row(routerName):
+ """
+ Returns the number for the router.
+ Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
+ etc
+ """
+ return int(routerName[1:])
+
+
+def number_to_column(routerName):
+ """
+ Returns the number for the router.
+ Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
+ z23 = column 26 etc
+ """
+ return ord(routerName[0]) - 97
+
+
+#############################################
+# Common APIs, will be used by all protocols
+#############################################
+
+def validate_ip_address(ip_address):
+ """
+ Validates the type of ip address
+
+ Parameters
+ ----------
+ * `ip_address`: IPv4/IPv6 address
+
+ Returns
+ -------
+ Type of address as string
+ """
+
+ if "/" in ip_address:
+ ip_address = ip_address.split("/")[0]
+
+ v4 = True
+ v6 = True
+ try:
+ socket.inet_aton(ip_address)
+ except socket.error as error:
+ logger.debug("Not a valid IPv4 address")
+ v4 = False
+ else:
+ return "ipv4"
+
+ try:
+ socket.inet_pton(socket.AF_INET6, ip_address)
+ except socket.error as error:
+ logger.debug("Not a valid IPv6 address")
+ v6 = False
+ else:
+ return "ipv6"
+
+ if not v4 and not v6:
+ raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6"
+ " address" % ip_address)
+
+
+def check_address_types(addr_type):
+ """
+ Checks environment variable set and compares with the current address type
+ """
+ global ADDRESS_TYPES
+ if ADDRESS_TYPES is None:
+ ADDRESS_TYPES = "dual"
+
+ if ADDRESS_TYPES == "dual":
+ ADDRESS_TYPES = ["ipv4", "ipv6"]
+ elif ADDRESS_TYPES == "ipv4":
+ ADDRESS_TYPES = ["ipv4"]
+ elif ADDRESS_TYPES == "ipv6":
+ ADDRESS_TYPES = ["ipv6"]
+
+ if addr_type not in ADDRESS_TYPES:
+ logger.error("{} not in supported/configured address types {}".
+ format(addr_type, ADDRESS_TYPES))
+ return False
+
+ return ADDRESS_TYPES
+
+
+def generate_ips(network, no_of_ips):
+ """
+ Returns list of IPs.
+ based on start_ip and no_of_ips
+
+ * `network` : from here the ip will start generating, start_ip will be
+ first ip
+ * `no_of_ips` : these many IPs will be generated
+
+ Limitation: It will generate IPs only for ip_mask 32
+
+ """
+ ipaddress_list = []
+ if type(network) is not list:
+ network = [network]
+
+ for start_ipaddr in network:
+ if "/" in start_ipaddr:
+ start_ip = start_ipaddr.split("/")[0]
+ mask = int(start_ipaddr.split("/")[1])
+
+ addr_type = validate_ip_address(start_ip)
+ if addr_type == "ipv4":
+ start_ip = ipaddr.IPv4Address(unicode(start_ip))
+ step = 2 ** (32 - mask)
+ if addr_type == "ipv6":
+ start_ip = ipaddr.IPv6Address(unicode(start_ip))
+ step = 2 ** (128 - mask)
+
+ next_ip = start_ip
+ count = 0
+ while count < no_of_ips:
+ ipaddress_list.append("{}/{}".format(next_ip, mask))
+ if addr_type == "ipv6":
+ next_ip = ipaddr.IPv6Address(int(next_ip) + step)
+ else:
+ next_ip += step
+ count += 1
+
+ return ipaddress_list
+
+
+def find_interface_with_greater_ip(topo, router, loopback=True,
+ interface=True):
+ """
+ Returns highest interface ip for ipv4/ipv6. If loopback is there then
+ it will return highest IP from loopback IPs otherwise from physical
+ interface IPs.
+
+ * `topo` : json file data
+ * `router` : router for which hightest interface should be calculated
+ """
+
+ link_data = topo["routers"][router]["links"]
+ lo_list = []
+ interfaces_list = []
+ lo_exists = False
+ for destRouterLink, data in sorted(link_data.iteritems()):
+ if loopback:
+ if "type" in data and data["type"] == "loopback":
+ lo_exists = True
+ ip_address = topo["routers"][router]["links"][
+ destRouterLink]["ipv4"].split("/")[0]
+ lo_list.append(ip_address)
+ if interface:
+ ip_address = topo["routers"][router]["links"][
+ destRouterLink]["ipv4"].split("/")[0]
+ interfaces_list.append(ip_address)
+
+ if lo_exists:
+ return sorted(lo_list)[-1]
+
+ return sorted(interfaces_list)[-1]
+
+
+def write_test_header(tc_name):
+ """ Display message at beginning of test case"""
+ count = 20
+ logger.info("*"*(len(tc_name)+count))
+ logger.info("START -> Testcase : %s", tc_name)
+ logger.info("*"*(len(tc_name)+count))
+
+
+def write_test_footer(tc_name):
+ """ Display message at end of test case"""
+ count = 21
+ logger.info("="*(len(tc_name)+count))
+ logger.info("PASSED -> Testcase : %s", tc_name)
+ logger.info("="*(len(tc_name)+count))
+
+
+#############################################
+# These APIs, will used by testcase
+#############################################
+def create_interfaces_cfg(tgen, topo, build=False):
+ """
+ Create interface configuration for created topology. Basic Interface
+ configuration is provided in input json file.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `build` : Only for initial setup phase this is set as True.
+
+ Returns
+ -------
+ True or False
+ """
+ result = False
+
+ try:
+ for c_router, c_data in topo.iteritems():
+ interface_data = []
+ for destRouterLink, data in sorted(c_data["links"].iteritems()):
+ # Loopback interfaces
+ if "type" in data and data["type"] == "loopback":
+ interface_name = destRouterLink
+ else:
+ interface_name = data["interface"]
+ interface_data.append("interface {}\n".format(
+ str(interface_name)
+ ))
+ if "ipv4" in data:
+ intf_addr = c_data["links"][destRouterLink]["ipv4"]
+ interface_data.append("ip address {}\n".format(
+ intf_addr
+ ))
+ if "ipv6" in data:
+ intf_addr = c_data["links"][destRouterLink]["ipv6"]
+ interface_data.append("ipv6 address {}\n".format(
+ intf_addr
+ ))
+ result = create_common_configuration(tgen, c_router,
+ interface_data,
+ "interface_config",
+ build=build)
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ return result
+
+
+def create_static_routes(tgen, input_dict, build=False):
+ """
+ Create static routes for given router as defined in input_dict
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ input_dict should be in the format below:
+ # static_routes: list of all routes
+ # network: network address
+ # no_of_ip: number of next-hop address that will be configured
+ # admin_distance: admin distance for route/routes.
+ # next_hop: starting next-hop address
+ # tag: tag id for static routes
+ # delete: True if config to be removed. Default False.
+
+ Example:
+ "routers": {
+ "r1": {
+ "static_routes": [
+ {
+ "network": "100.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.1",
+ "tag": 4001
+ "delete": true
+ }
+ ]
+ }
+ }
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+ result = False
+ logger.debug("Entering lib API: create_static_routes()")
+ try:
+ for router in input_dict.keys():
+ if "static_routes" not in input_dict[router]:
+ errormsg = "static_routes not present in input_dict"
+ logger.info(errormsg)
+ continue
+
+ static_routes_list = []
+
+ static_routes = input_dict[router]["static_routes"]
+ for static_route in static_routes:
+ del_action = static_route.setdefault("delete", False)
+ # No of IPs
+ no_of_ip = static_route.setdefault("no_of_ip", 1)
+ admin_distance = static_route.setdefault("admin_distance",
+ None)
+ tag = static_route.setdefault("tag", None)
+ if "next_hop" not in static_route or \
+ "network" not in static_route:
+ errormsg = "'next_hop' or 'network' missing in" \
+ " input_dict"
+ return errormsg
+
+ next_hop = static_route["next_hop"]
+ network = static_route["network"]
+ ip_list = generate_ips([network], no_of_ip)
+ for ip in ip_list:
+ addr_type = validate_ip_address(ip)
+ if addr_type == "ipv4":
+ cmd = "ip route {} {}".format(ip, next_hop)
+ else:
+ cmd = "ipv6 route {} {}".format(ip, next_hop)
+
+ if tag:
+ cmd = "{} {}".format(cmd, str(tag))
+ if admin_distance:
+ cmd = "{} {}".format(cmd, admin_distance)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ static_routes_list.append(cmd)
+
+ result = create_common_configuration(tgen, router,
+ static_routes_list,
+ "static_route",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_static_routes()")
+ return result
+
+
+def create_prefix_lists(tgen, input_dict, build=False):
+ """
+ Create ip prefix lists as per the config provided in input
+ JSON or input_dict
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ # pf_lists_1: name of prefix-list, user defined
+ # seqid: prefix-list seqid, auto-generated if not given by user
+ # network: criteria for applying prefix-list
+ # action: permit/deny
+ # le: less than or equal number of bits
+ # ge: greater than or equal number of bits
+
+ Example
+ -------
+ input_dict = {
+ "r1": {
+ "prefix_lists":{
+ "ipv4": {
+ "pf_list_1": [
+ {
+ "seqid": 10,
+ "network": "any",
+ "action": "permit",
+ "le": "32",
+ "ge": "30",
+ "delete": True
+ }
+ ]
+ }
+ }
+ }
+ }
+
+ Returns
+ -------
+ errormsg or True
+ """
+
+ logger.debug("Entering lib API: create_prefix_lists()")
+ result = False
+ try:
+ for router in input_dict.keys():
+ if "prefix_lists" not in input_dict[router]:
+ errormsg = "prefix_lists not present in input_dict"
+ logger.info(errormsg)
+ continue
+
+ config_data = []
+ prefix_lists = input_dict[router]["prefix_lists"]
+ for addr_type, prefix_data in prefix_lists.iteritems():
+ if not check_address_types(addr_type):
+ continue
+
+ for prefix_name, prefix_list in prefix_data.iteritems():
+ for prefix_dict in prefix_list:
+ if "action" not in prefix_dict or \
+ "network" not in prefix_dict:
+ errormsg = "'action' or network' missing in" \
+ " input_dict"
+ return errormsg
+
+ network_addr = prefix_dict["network"]
+ action = prefix_dict["action"]
+ le = prefix_dict.setdefault("le", None)
+ ge = prefix_dict.setdefault("ge", None)
+ seqid = prefix_dict.setdefault("seqid", None)
+ del_action = prefix_dict.setdefault("delete", False)
+ if seqid is None:
+ seqid = get_seq_id("prefix_lists", router,
+ prefix_name)
+ else:
+ set_seq_id("prefix_lists", router, seqid,
+ prefix_name)
+
+ if addr_type == "ipv4":
+ protocol = "ip"
+ else:
+ protocol = "ipv6"
+
+ cmd = "{} prefix-list {} seq {} {} {}".format(
+ protocol, prefix_name, seqid, action, network_addr
+ )
+ if le:
+ cmd = "{} le {}".format(cmd, le)
+ if ge:
+ cmd = "{} ge {}".format(cmd, ge)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+ result = create_common_configuration(tgen, router,
+ config_data,
+ "prefix_list",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_prefix_lists()")
+ return result
+
+
+def create_route_maps(tgen, input_dict, build=False):
+ """
+ Create route-map on the devices as per the arguments passed
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `input_dict` : Input dict data, required when configuring from testcase
+ * `build` : Only for initial setup phase this is set as True.
+
+ Usage
+ -----
+ # route_maps: key, value pair for route-map name and its attribute
+ # rmap_match_prefix_list_1: user given name for route-map
+ # action: PERMIT/DENY
+ # match: key,value pair for match criteria. prefix_list, community-list,
+ large-community-list or tag. Only one option at a time.
+ # prefix_list: name of prefix list
+ # large-community-list: name of large community list
+ # community-ist: name of community list
+ # tag: tag id for static routes
+ # set: key, value pair for modifying route attributes
+ # localpref: preference value for the network
+ # med: metric value advertised for AS
+ # aspath: set AS path value
+ # weight: weight for the route
+ # community: standard community value to be attached
+ # large_community: large community value to be attached
+ # community_additive: if set to "additive", adds community/large-community
+ value to the existing values of the network prefix
+
+ Example:
+ --------
+ input_dict = {
+ "r1": {
+ "route_maps": {
+ "rmap_match_prefix_list_1": [
+ {
+ "action": "PERMIT",
+ "match": {
+ "ipv4": {
+ "prefix_list": "pf_list_1"
+ }
+ "ipv6": {
+ "prefix_list": "pf_list_1"
+ }
+
+ "large-community-list": "{
+ "id": "community_1",
+ "exact_match": True
+ }
+ "community": {
+ "id": "community_2",
+ "exact_match": True
+ }
+ "tag": "tag_id"
+ },
+ "set": {
+ "localpref": 150,
+ "med": 30,
+ "aspath": {
+ "num": 20000,
+ "action": "prepend",
+ },
+ "weight": 500,
+ "community": {
+ "num": "1:2 2:3",
+ "action": additive
+ }
+ "large_community": {
+ "num": "1:2:3 4:5;6",
+ "action": additive
+ },
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ result = False
+ logger.debug("Entering lib API: create_route_maps()")
+
+ try:
+ for router in input_dict.keys():
+ if "route_maps" not in input_dict[router]:
+ errormsg = "route_maps not present in input_dict"
+ logger.info(errormsg)
+ continue
+ rmap_data = []
+ for rmap_name, rmap_value in \
+ input_dict[router]["route_maps"].iteritems():
+
+ for rmap_dict in rmap_value:
+ del_action = rmap_dict.setdefault("delete", False)
+
+ if del_action:
+ rmap_data.append("no route-map {}".format(rmap_name))
+ continue
+
+ if "action" not in rmap_dict:
+ errormsg = "action not present in input_dict"
+ logger.error(errormsg)
+ return False
+
+ rmap_action = rmap_dict.setdefault("action", "deny")
+
+ seq_id = rmap_dict.setdefault("seq_id", None)
+ if seq_id is None:
+ seq_id = get_seq_id("route_maps", router, rmap_name)
+ else:
+ set_seq_id("route_maps", router, seq_id, rmap_name)
+
+ rmap_data.append("route-map {} {} {}".format(
+ rmap_name, rmap_action, seq_id
+ ))
+
+ # Verifying if SET criteria is defined
+ if "set" in rmap_dict:
+ set_data = rmap_dict["set"]
+
+ local_preference = set_data.setdefault("localpref",
+ None)
+ metric = set_data.setdefault("med", None)
+ as_path = set_data.setdefault("aspath", {})
+ weight = set_data.setdefault("weight", None)
+ community = set_data.setdefault("community", {})
+ large_community = set_data.setdefault(
+ "large_community", {})
+ set_action = set_data.setdefault("set_action", None)
+
+ # Local Preference
+ if local_preference:
+ rmap_data.append("set local-preference {}".
+ format(local_preference))
+
+ # Metric
+ if metric:
+ rmap_data.append("set metric {} \n".format(metric))
+
+ # AS Path Prepend
+ if as_path:
+ as_num = as_path.setdefault("as_num", None)
+ as_action = as_path.setdefault("as_action", None)
+ if as_action and as_num:
+ rmap_data.append("set as-path {} {}".
+ format(as_action, as_num))
+
+ # Community
+ if community:
+ num = community.setdefault("num", None)
+ comm_action = community.setdefault("action", None)
+ if num:
+ cmd = "set community {}".format(num)
+ if comm_action:
+ cmd = "{} {}".format(cmd, comm_action)
+ rmap_data.append(cmd)
+ else:
+ logger.error("In community, AS Num not"
+ " provided")
+ return False
+
+ if large_community:
+ num = large_community.setdefault("num", None)
+ comm_action = large_community.setdefault("action",
+ None)
+ if num:
+ cmd = "set large-community {}".format(num)
+ if comm_action:
+ cmd = "{} {}".format(cmd, comm_action)
+
+ rmap_data.append(cmd)
+ else:
+ logger.errror("In large_community, AS Num not"
+ " provided")
+ return False
+
+ # Weight
+ if weight:
+ rmap_data.append("set weight {} \n".format(
+ weight))
+
+ # Adding MATCH and SET sequence to RMAP if defined
+ if "match" in rmap_dict:
+ match_data = rmap_dict["match"]
+ ipv4_data = match_data.setdefault("ipv4", {})
+ ipv6_data = match_data.setdefault("ipv6", {})
+ community = match_data.setdefault("community-list",
+ {})
+ large_community = match_data.setdefault(
+ "large-community-list", {}
+ )
+ tag = match_data.setdefault("tag", None)
+
+ if ipv4_data:
+ prefix_name = ipv4_data.setdefault("prefix_lists",
+ None)
+ if prefix_name:
+ rmap_data.append("match ip address prefix-list"
+ " {}".format(prefix_name))
+ if ipv6_data:
+ prefix_name = ipv6_data.setdefault("prefix_lists",
+ None)
+ if prefix_name:
+ rmap_data.append("match ipv6 address "
+ "prefix-list {}".
+ format(prefix_name))
+ if tag:
+ rmap_data.append("match tag {}".format(tag))
+
+ if community:
+ if "id" not in community:
+ logger.error("'id' is mandatory for "
+ "community-list in match"
+ " criteria")
+ return False
+ cmd = "match community {}".format(community["id"])
+ exact_match = community.setdefault("exact_match",
+ False)
+ if exact_match:
+ cmd = "{} exact-match".format(cmd)
+
+ rmap_data.append(cmd)
+
+ if large_community:
+ if "id" not in large_community:
+ logger.error("'num' is mandatory for "
+ "large-community-list in match "
+ "criteria")
+ return False
+ cmd = "match large-community {}".format(
+ large_community["id"])
+ exact_match = large_community.setdefault(
+ "exact_match", False)
+ if exact_match:
+ cmd = "{} exact-match".format(cmd)
+
+ rmap_data.append(cmd)
+
+ result = create_common_configuration(tgen, router,
+ rmap_data,
+ "route_maps",
+ build=build)
+
+ except InvalidCLIError:
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.debug("Exiting lib API: create_prefix_lists()")
+ return result
+
+
+#############################################
+# Verification APIs
+#############################################
+def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
+ """
+ Data will be read from input_dict or input JSON file, API will generate
+ same prefixes, which were redistributed by either create_static_routes() or
+ advertise_networks_using_network_command() and do will verify next_hop and
+ each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
+ command o/p.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test, for which user wants to test the data
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default: static
+ * `protocol`[optional]: protocol, default = None
+
+ Usage
+ -----
+ # RIB can be verified for static routes OR network advertised using
+ network command. Following are input_dicts to create static routes
+ and advertise networks using network command. Any one of the input_dict
+ can be passed to verify_rib() to verify routes in DUT"s RIB.
+
+ # Creating static routes for r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
+ "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
+ }}
+ # Advertising networks using network command in router r1
+ input_dict = {
+ "r1": {
+ "advertise_networks": [{"start_ip": "20.0.0.0/32",
+ "no_of_network": 10},
+ {"start_ip": "30.0.0.0/32"}]
+ }}
+ # Verifying ipv4 routes in router r1 learned via BGP
+ dut = "r2"
+ protocol = "bgp"
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_rib()")
+
+ router_list = tgen.routers()
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ if addr_type == "ipv4":
+ if protocol:
+ command = "show ip route {} json".format(protocol)
+ else:
+ command = "show ip route json"
+ else:
+ if protocol:
+ command = "show ipv6 route {} json".format(protocol)
+ else:
+ command = "show ipv6 route json"
+
+ sleep(2)
+ logger.info("Checking router %s RIB:", router)
+ rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) is False:
+ errormsg = "No {} route found in rib of router {}..". \
+ format(protocol, router)
+ return errormsg
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+ st_found = False
+ nh_found = False
+ found_routes = []
+ missing_routes = []
+ for static_route in static_routes:
+ network = static_route["network"]
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if type(next_hop) is not list:
+ next_hop = [next_hop]
+
+ found_hops = [rib_r["ip"] for rib_r in
+ rib_routes_json[st_rt][0][
+ "nexthops"]]
+ for nh in next_hop:
+ nh_found = False
+ if nh and nh in found_hops:
+ nh_found = True
+ else:
+ errormsg = ("Nexthop {} is Missing for {}"
+ " route {} in RIB of router"
+ " {}\n".format(next_hop,
+ protocol,
+ st_rt, dut))
+
+ return errormsg
+ else:
+ missing_routes.append(st_rt)
+ if nh_found:
+ logger.info("Found next_hop %s for all routes in RIB of"
+ " router %s\n", next_hop, dut)
+
+ if not st_found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, routes: " \
+ "{}\n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s\n", dut, found_routes)
+
+ advertise_network = input_dict[routerInput].setdefault(
+ "advertise_networks", {})
+ if advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+ for advertise_network_dict in advertise_network:
+ start_ip = advertise_network_dict["network"]
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 0
+
+ # Generating IPs for verification
+ ip_list = generate_ips(start_ip, no_of_network)
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ if st_rt in rib_routes_json:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ missing_routes.append(st_rt)
+
+ if not found and len(missing_routes) > 0:
+ errormsg = "Missing route in RIB of router {}, are: {}" \
+ " \n".format(dut, missing_routes)
+ return errormsg
+
+ logger.info("Verified routes in router %s RIB, found routes"
+ " are: %s", dut, found_routes)
+
+ logger.info("Exiting lib API: verify_rib()")
+ return True
+
+
+def verify_admin_distance_for_static_routes(tgen, input_dict):
+ """
+ API to verify admin distance for static routes as defined in input_dict/
+ input JSON by running show ip/ipv6 route json command.
+
+ Parameter
+ ---------
+ * `tgen` : topogen object
+ * `input_dict`: having details like - for which router and static routes
+ admin dsitance needs to be verified
+ Usage
+ -----
+ # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
+ 10.0.0.2 in router r1
+ input_dict = {
+ "r1": {
+ "static_routes": [{
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2"
+ }]
+ }
+ }
+ result = verify_admin_distance_for_static_routes(tgen, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ for static_route in input_dict[router]["static_routes"]:
+ addr_type = validate_ip_address(static_route["network"])
+ # Command to execute
+ if addr_type == "ipv4":
+ command = "show ip route json"
+ else:
+ command = "show ipv6 route json"
+ show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
+
+ logger.info("Verifying admin distance for static route %s"
+ " under dut %s:", static_route, router)
+ network = static_route["network"]
+ next_hop = static_route["next_hop"]
+ admin_distance = static_route["admin_distance"]
+ route_data = show_ip_route_json[network][0]
+ if network in show_ip_route_json:
+ if route_data["nexthops"][0]["ip"] == next_hop:
+ if route_data["distance"] != admin_distance:
+ errormsg = ("Verification failed: admin distance"
+ " for static route {} under dut {},"
+ " found:{} but expected:{}".
+ format(static_route, router,
+ route_data["distance"],
+ admin_distance))
+ return errormsg
+ else:
+ logger.info("Verification successful: admin"
+ " distance for static route %s under"
+ " dut %s, found:%s", static_route,
+ router, route_data["distance"])
+
+ else:
+ errormsg = ("Static route {} not found in "
+ "show_ip_route_json for dut {}".
+ format(network, router))
+ return errormsg
+
+ logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
+ return True
+
+
+def verify_prefix_lists(tgen, input_dict):
+ """
+ Running "show ip prefix-list" command and verifying given prefix-list
+ is present in router.
+
+ Parameters
+ ----------
+ * `tgen` : topogen object
+ * `input_dict`: data to verify prefix lists
+
+ Usage
+ -----
+ # To verify pf_list_1 is present in router r1
+ input_dict = {
+ "r1": {
+ "prefix_lists": ["pf_list_1"]
+ }}
+ result = verify_prefix_lists("ipv4", input_dict, tgen)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_prefix_lists()")
+
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ # Show ip prefix list
+ show_prefix_list = rnode.vtysh_cmd("show ip prefix-list")
+
+ # Verify Prefix list is deleted
+ prefix_lists_addr = input_dict[router]["prefix_lists"]
+ for addr_type in prefix_lists_addr:
+ if not check_address_types(addr_type):
+ continue
+
+ for prefix_list in prefix_lists_addr[addr_type].keys():
+ if prefix_list in show_prefix_list:
+ errormsg = ("Prefix list {} is not deleted from router"
+ " {}".format(prefix_list, router))
+ return errormsg
+
+ logger.info("Prefix list %s is/are deleted successfully"
+ " from router %s", prefix_list, router)
+
+ logger.info("Exiting lib API: verify_prefix_lissts()")
+ return True
diff --git a/tests/topotests/lib/topojson.py b/tests/topotests/lib/topojson.py
new file mode 100644
index 0000000000..4130451d2e
--- /dev/null
+++ b/tests/topotests/lib/topojson.py
@@ -0,0 +1,193 @@
+#
+# Modified work Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Original work Copyright (c) 2018 by Network Device Education
+# Foundation, Inc. ("NetDEF")
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+from collections import OrderedDict
+from json import dumps as json_dumps
+import ipaddr
+import pytest
+
+# Import topogen and topotest helpers
+from lib.topolog import logger
+
+# Required to instantiate the topology builder class.
+from lib.common_config import (
+ number_to_row, number_to_column,
+ load_config_to_router,
+ create_interfaces_cfg,
+ create_static_routes,
+ create_prefix_lists,
+ create_route_maps,
+)
+
+from lib.bgp import create_router_bgp
+
+def build_topo_from_json(tgen, topo):
+ """
+ Reads configuration from JSON file. Adds routers, creates interface
+ names dynamically and link routers as defined in JSON to create
+ topology. Assigns IPs dynamically to all interfaces of each router.
+
+ * `tgen`: Topogen object
+ * `topo`: json file data
+ """
+
+ listRouters = []
+ for routerN in sorted(topo['routers'].iteritems()):
+ logger.info('Topo: Add router {}'.format(routerN[0]))
+ tgen.add_router(routerN[0])
+ listRouters.append(routerN[0])
+
+ listRouters.sort()
+ if 'ipv4base' in topo:
+ ipv4Next = ipaddr.IPv4Address(topo['link_ip_start']['ipv4'])
+ ipv4Step = 2 ** (32 - topo['link_ip_start']['v4mask'])
+ if topo['link_ip_start']['v4mask'] < 32:
+ ipv4Next += 1
+ if 'ipv6base' in topo:
+ ipv6Next = ipaddr.IPv6Address(topo['link_ip_start']['ipv6'])
+ ipv6Step = 2 ** (128 - topo['link_ip_start']['v6mask'])
+ if topo['link_ip_start']['v6mask'] < 127:
+ ipv6Next += 1
+ for router in listRouters:
+ topo['routers'][router]['nextIfname'] = 0
+
+ while listRouters != []:
+ curRouter = listRouters.pop(0)
+ # Physical Interfaces
+ if 'links' in topo['routers'][curRouter]:
+ def link_sort(x):
+ if x == 'lo':
+ return 0
+ elif 'link' in x:
+ return int(x.split('-link')[1])
+ else:
+ return int(x.split('r')[1])
+ for destRouterLink, data in sorted(topo['routers'][curRouter]['links']. \
+ iteritems(),
+ key=lambda x: link_sort(x[0])):
+ currRouter_lo_json = \
+ topo['routers'][curRouter]['links'][destRouterLink]
+ # Loopback interfaces
+ if 'type' in data and data['type'] == 'loopback':
+ if 'ipv4' in currRouter_lo_json and \
+ currRouter_lo_json['ipv4'] == 'auto':
+ currRouter_lo_json['ipv4'] = '{}{}.{}/{}'. \
+ format(topo['lo_prefix']['ipv4'], number_to_row(curRouter), \
+ number_to_column(curRouter), topo['lo_prefix']['v4mask'])
+ if 'ipv6' in currRouter_lo_json and \
+ currRouter_lo_json['ipv6'] == 'auto':
+ currRouter_lo_json['ipv6'] = '{}{}:{}/{}'. \
+ format(topo['lo_prefix']['ipv6'], number_to_row(curRouter), \
+ number_to_column(curRouter), topo['lo_prefix']['v6mask'])
+
+ if "-" in destRouterLink:
+ # Spliting and storing destRouterLink data in tempList
+ tempList = destRouterLink.split("-")
+
+ # destRouter
+ destRouter = tempList.pop(0)
+
+ # Current Router Link
+ tempList.insert(0, curRouter)
+ curRouterLink = "-".join(tempList)
+ else:
+ destRouter = destRouterLink
+ curRouterLink = curRouter
+
+ if destRouter in listRouters:
+ currRouter_link_json = \
+ topo['routers'][curRouter]['links'][destRouterLink]
+ destRouter_link_json = \
+ topo['routers'][destRouter]['links'][curRouterLink]
+
+ # Assigning name to interfaces
+ currRouter_link_json['interface'] = \
+ '{}-{}-eth{}'.format(curRouter, destRouter, topo['routers'] \
+ [curRouter]['nextIfname'])
+ destRouter_link_json['interface'] = \
+ '{}-{}-eth{}'.format(destRouter, curRouter, topo['routers'] \
+ [destRouter]['nextIfname'])
+
+ topo['routers'][curRouter]['nextIfname'] += 1
+ topo['routers'][destRouter]['nextIfname'] += 1
+
+ # Linking routers to each other as defined in JSON file
+ tgen.gears[curRouter].add_link(tgen.gears[destRouter],
+ topo['routers'][curRouter]['links'][destRouterLink] \
+ ['interface'], topo['routers'][destRouter]['links'] \
+ [curRouterLink]['interface'])
+
+ # IPv4
+ if 'ipv4' in currRouter_link_json:
+ if currRouter_link_json['ipv4'] == 'auto':
+ currRouter_link_json['ipv4'] = \
+ '{}/{}'.format(ipv4Next, topo['link_ip_start'][ \
+ 'v4mask'])
+ destRouter_link_json['ipv4'] = \
+ '{}/{}'.format(ipv4Next + 1, topo['link_ip_start'][ \
+ 'v4mask'])
+ ipv4Next += ipv4Step
+ # IPv6
+ if 'ipv6' in currRouter_link_json:
+ if currRouter_link_json['ipv6'] == 'auto':
+ currRouter_link_json['ipv6'] = \
+ '{}/{}'.format(ipv6Next, topo['link_ip_start'][ \
+ 'v6mask'])
+ destRouter_link_json['ipv6'] = \
+ '{}/{}'.format(ipv6Next + 1, topo['link_ip_start'][ \
+ 'v6mask'])
+ ipv6Next = ipaddr.IPv6Address(int(ipv6Next) + ipv6Step)
+
+ logger.debug("Generated link data for router: %s\n%s", curRouter,
+ json_dumps(topo["routers"][curRouter]["links"],
+ indent=4, sort_keys=True))
+
+
+def build_config_from_json(tgen, topo, save_bkup=True):
+ """
+ Reads initial configuraiton from JSON for each router, builds
+ configuration and loads its to router.
+
+ * `tgen`: Topogen object
+ * `topo`: json file data
+ """
+
+ func_dict = OrderedDict([
+ ("links", create_interfaces_cfg),
+ ("static_routes", create_static_routes),
+ ("prefix_lists", create_prefix_lists),
+ ("route_maps", create_route_maps),
+ ("bgp", create_router_bgp)
+ ])
+
+ data = topo["routers"]
+ for func_type in func_dict.keys():
+ logger.info('Building configuration for {}'.format(func_type))
+
+ func_dict.get(func_type)(tgen, data, build=True)
+
+ for router in sorted(topo['routers'].keys()):
+ logger.info('Configuring router {}...'.format(router))
+
+ result = load_config_to_router(tgen, router, save_bkup)
+ if not result:
+ logger.info("Failed while configuring {}".format(router))
+ pytest.exit(1)
+
diff --git a/tests/topotests/lib/topotest.py b/tests/topotests/lib/topotest.py
index 86993665ce..867f9f2f03 100644
--- a/tests/topotests/lib/topotest.py
+++ b/tests/topotests/lib/topotest.py
@@ -959,7 +959,7 @@ class Router(Node):
global fatal_error
- daemonsRunning = self.cmd('vtysh -c "show log" | grep "Logging configuration for"')
+ daemonsRunning = self.cmd('vtysh -c "show logging" | grep "Logging configuration for"')
# Look for AddressSanitizer Errors in vtysh output and append to /tmp/AddressSanitzer.txt if found
if checkAddressSanitizerError(daemonsRunning, self.name, "vtysh"):
return "%s: vtysh killed by AddressSanitizer" % (self.name)
diff --git a/tests/topotests/ospf6-topo1/README.md b/tests/topotests/ospf6-topo1/README.md
index 28f68e8fa5..526c019c6a 100644
--- a/tests/topotests/ospf6-topo1/README.md
+++ b/tests/topotests/ospf6-topo1/README.md
@@ -102,7 +102,7 @@ Simplified `R3` config
Test is executed by running
- vtysh -c "show log" | grep "Logging configuration for"
+ vtysh -c "show logging" | grep "Logging configuration for"
on each FRR router. This should return the logging information for all daemons registered
to Zebra and the list of running daemons is compared to the daemons started for this test (`zebra` and `ospf6d`)
diff --git a/tests/topotests/pytest.ini b/tests/topotests/pytest.ini
index 119ab93857..7ea38491d8 100644
--- a/tests/topotests/pytest.ini
+++ b/tests/topotests/pytest.ini
@@ -10,6 +10,13 @@ norecursedirs = .git example-test lib docker
# value is 'info', but can be changed to 'debug' to provide more details.
#verbosity = info
+# Save logs to log file, by default logs will be displayed to console
+#frrtest_log_dir = /tmp/topotests/
+
+# Display router current configuration during test execution,
+# by default configuration will not be shown
+show_router_config = True
+
# Default daemons binaries path.
#frrdir = /usr/lib/frr
#quaggadir = /usr/lib/quagga