--- /dev/null
+{
+ "ipv4base": "10.0.0.0",
+ "ipv4mask": 30,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "10.0.0.0",
+ "v4mask": 30,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:DB8:F::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r4": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ },
+ "r4": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r4": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "200",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
--- /dev/null
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test BGP basic functionality:
+
+Test steps
+- Create topology (setup module)
+ Creating 4 routers topology, r1, r2, r3 are in IBGP and
+ r3, r4 are in EBGP
+- Bring up topology
+- Verify for bgp to converge
+- Modify/Delete and verify router-id
+"""
+
+import os
+import sys
+import json
+import time
+import pytest
+from copy import deepcopy
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, '../lib/'))
+
+# Required to instantiate the topology builder class.
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+from mininet.topo import Topo
+
+from lib.common_config import (
+ start_topology, stop_topology, write_test_header,
+ write_test_footer
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence, create_router_bgp, verify_router_id
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_basic_functionality.json".format(CWD)
+try:
+ with open(jsonFile, 'r') as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+
+class CreateTopo(Topo):
+ """
+ Test BasicTopo - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ """Build function"""
+ tgen = get_topogen(self)
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(CreateTopo, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ global BGP_CONVERGENCE
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}". \
+ format(BGP_CONVERGENCE)
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module():
+ """Teardown the pytest environment"""
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ stop_topology(tgen)
+
+ logger.info("Testsuite end time: {}".
+ format(time.asctime(time.localtime(time.time()))))
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Testcases
+#
+#####################################################
+
+
+def test_modify_and_delete_router_id(request):
+ """ Test to modify, delete and verify router-id. """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Modify router id
+ input_dict = {
+ 'r1': {
+ "bgp": {
+ 'router_id': '12.12.12.12'
+ }
+ },
+ 'r2': {
+ "bgp": {
+ 'router_id': '22.22.22.22'
+ }
+ },
+ 'r3': {
+ "bgp": {
+ 'router_id': '33.33.33.33'
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".\
+ format(tc_name, result)
+
+ # Verifying router id once modified
+ result = verify_router_id(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".\
+ format(tc_name, result)
+
+ # Delete router id
+ input_dict = {
+ 'r1': {
+ "bgp": {
+ 'del_router_id': True
+ }
+ },
+ 'r2': {
+ "bgp": {
+ 'del_router_id': True
+ }
+ },
+ 'r3': {
+ "bgp": {
+ 'del_router_id': True
+ }
+ },
+ }
+ result = create_router_bgp(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Verifying router id once deleted
+ # Once router-id is deleted, highest interface ip should become
+ # router-id
+ result = verify_router_id(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == '__main__':
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
InvalidCLIError,
load_config_to_router,
check_address_types,
- generate_ips)
+ generate_ips,
+ find_interface_with_greater_ip)
BGP_CONVERGENCE_TIMEOUT = 10
return config_data
+
+#############################################
+# Verification APIs
+#############################################
+def verify_router_id(tgen, topo, input_dict):
+ """
+ Running command "show ip bgp json" for DUT and reading router-id
+ from input_dict and verifying with command output.
+ 1. Statically modfified router-id should take place
+ 2. When static router-id is deleted highest loopback should
+ become router-id
+ 3. When loopback intf is down then highest physcial intf
+ should become router-id
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `input_dict`: input dictionary, have details of Device Under Test, for
+ which user wants to test the data
+ Usage
+ -----
+ # Verify if router-id for r1 is 12.12.12.12
+ input_dict = {
+ "r1":{
+ "router_id": "12.12.12.12"
+ }
+ # Verify that router-id for r1 is highest interface ip
+ input_dict = {
+ "routers": ["r1"]
+ }
+ result = verify_router_id(tgen, topo, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_router_id()")
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ del_router_id = input_dict[router]["bgp"].setdefault(
+ "del_router_id", False)
+
+ logger.info("Checking router %s router-id", router)
+ show_bgp_json = rnode.vtysh_cmd("show ip bgp json",
+ isjson=True)
+ router_id_out = show_bgp_json["routerId"]
+ router_id_out = ipaddr.IPv4Address(unicode(router_id_out))
+
+ # Once router-id is deleted, highest interface ip should become
+ # router-id
+ if del_router_id:
+ router_id = find_interface_with_greater_ip(topo, router)
+ else:
+ router_id = input_dict[router]["bgp"]["router_id"]
+ router_id = ipaddr.IPv4Address(unicode(router_id))
+
+ if router_id == router_id_out:
+ logger.info("Found expected router-id %s for router %s",
+ router_id, router)
+ else:
+ errormsg = "Router-id for router:{} mismatch, expected:" \
+ " {} but found:{}".format(router, router_id,
+ router_id_out)
+ return errormsg
+
+ logger.info("Exiting lib API: verify_router_id()")
+ return True
+
+
+def verify_bgp_convergence(tgen, topo):
+ """
+ API will verify if BGP is converged with in the given time frame.
+ Running "show bgp summary json" command and verify bgp neighbor
+ state is established,
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type`: ip_type, ipv4/ipv6
+
+ Usage
+ -----
+ # To veriry is BGP is converged for all the routers used in
+ topology
+ results = verify_bgp_convergence(tgen, topo, "ipv4")
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_confergence()")
+ for router, rnode in tgen.routers().iteritems():
+ logger.info("Verifying BGP Convergence on router %s:", router)
+
+ for retry in range(1, 11):
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ # To find neighbor ip type
+ total_peer = 0
+
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ for addr_type in bgp_addr_type.keys():
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type.keys():
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ no_of_peer = 0
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link in peer_data["dest_link"].keys():
+ data = topo["routers"][bgp_neighbor]["links"]
+ if dest_link in data:
+ neighbor_ip = \
+ data[dest_link][addr_type].split("/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s", router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s",
+ router)
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on"
+ " router %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ show_bgp_summary = rnode.vtysh_cmd("show bgp summary")
+ errormsg = "TIMEOUT!! BGP is not converged in {} " \
+ "seconds for router {} \n {}".format(
+ BGP_CONVERGENCE_TIMEOUT, router,
+ show_bgp_summary)
+ return errormsg
+
+ logger.info("Exiting API: verify_bgp_confergence()")
+ return True
+
config_map = OrderedDict({
"general_config": "! FRR General Config\n",
- "interface_config": "! Interfaces Config\n"
+ "interface_config": "! Interfaces Config\n",
+ "bgp": "! BGP Config\n"
})
if build:
return ipaddress_list
+def find_interface_with_greater_ip(topo, router, loopback=True,
+ interface=True):
+ """
+ Returns highest interface ip for ipv4/ipv6. If loopback is there then
+ it will return highest IP from loopback IPs otherwise from physical
+ interface IPs.
+
+ * `topo` : json file data
+ * `router` : router for which hightest interface should be calculated
+ """
+
+ link_data = topo["routers"][router]["links"]
+ lo_list = []
+ interfaces_list = []
+ lo_exists = False
+ for destRouterLink, data in sorted(link_data.iteritems()):
+ if loopback:
+ if "type" in data and data["type"] == "loopback":
+ lo_exists = True
+ ip_address = topo["routers"][router]["links"][
+ destRouterLink]["ipv4"].split("/")[0]
+ lo_list.append(ip_address)
+ if interface:
+ ip_address = topo["routers"][router]["links"][
+ destRouterLink]["ipv4"].split("/")[0]
+ interfaces_list.append(ip_address)
+
+ if lo_exists:
+ return sorted(lo_list)[-1]
+
+ return sorted(interfaces_list)[-1]
+
+
+def write_test_header(tc_name):
+ """ Display message at beginning of test case"""
+ count = 20
+ logger.info("*"*(len(tc_name)+count))
+ logger.info("START -> Testcase : %s", tc_name)
+ logger.info("*"*(len(tc_name)+count))
+
+
+def write_test_footer(tc_name):
+ """ Display message at end of test case"""
+ count = 21
+ logger.info("="*(len(tc_name)+count))
+ logger.info("PASSED -> Testcase : %s", tc_name)
+ logger.info("="*(len(tc_name)+count))
+
+
#############################################
# These APIs, will used by testcase
#############################################
create_interfaces_cfg
)
+from lib.bgp import create_router_bgp
def build_topo_from_json(tgen, topo):
"""
logger.debug("Generated link data for router: %s\n%s", curRouter,
json_dumps(topo["routers"][curRouter]["links"],
- indent=4, sort_keys=True)
+ indent=4, sort_keys=True))
+
def build_config_from_json(tgen, topo, save_bkup=True):
"""
func_dict = OrderedDict([
("links", create_interfaces_cfg),
+ ("bgp", create_router_bgp)
])
data = topo["routers"]