summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/common_config.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/common_config.py')
-rw-r--r--tests/topotests/lib/common_config.py125
1 files changed, 66 insertions, 59 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 1fa6d35101..d83f946c42 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -400,6 +400,7 @@ def check_router_status(tgen):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+
def getStrIO():
"""
Return a StringIO object appropriate for the current python version.
@@ -409,6 +410,7 @@ def getStrIO():
else:
return StringIO.StringIO()
+
def reset_config_on_routers(tgen, routerName=None):
"""
Resets configuration on routers to the snapshot created using input JSON
@@ -702,14 +704,14 @@ def start_topology(tgen, daemon=None):
)
TMPDIR = os.path.join(LOGDIR, tgen.modname)
- linux_ver = ''
+ linux_ver = ""
router_list = tgen.routers()
for rname in ROUTER_LIST:
router = router_list[rname]
# It will help in debugging the failures, will give more details on which
# specific kernel version tests are failing
- if linux_ver == '':
+ if linux_ver == "":
linux_ver = router.run("uname -a")
logger.info("Logging platform related details: \n %s \n", linux_ver)
@@ -741,11 +743,10 @@ def start_topology(tgen, daemon=None):
# Loading empty bgpd.conf file to router, to start the bgp deamon
router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname))
- if daemon and 'ospfd' in daemon:
+ if daemon and "ospfd" in daemon:
# Loading empty ospf.conf file to router, to start the bgp deamon
router.load_config(
- TopoRouter.RD_OSPF,
- '{}/{}/ospfd.conf'.format(TMPDIR, rname)
+ TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(TMPDIR, rname)
)
# Starting routers
logger.info("Starting all routers once topology is created")
@@ -831,8 +832,8 @@ def topo_daemons(tgen, topo):
)
for rtr in ROUTER_LIST:
- if 'ospf' in topo['routers'][rtr] and 'ospfd' not in daemon_list:
- daemon_list.append('ospfd')
+ if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
+ daemon_list.append("ospfd")
return daemon_list
@@ -1266,8 +1267,7 @@ def interface_status(tgen, topo, input_dict):
return True
-def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0,
- return_is_dict=False):
+def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False):
"""
Retries function execution, if return is an errormsg or exception
@@ -1279,11 +1279,10 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0,
"""
def _retry(func):
-
@wraps(func)
def func_retry(*args, **kwargs):
- _wait = kwargs.pop('wait', wait)
- _attempts = kwargs.pop('attempts', attempts)
+ _wait = kwargs.pop("wait", wait)
+ _attempts = kwargs.pop("attempts", attempts)
_attempts = int(_attempts)
expected = True
if _attempts < 0:
@@ -1293,19 +1292,21 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0,
logger.info("Waiting for [%s]s as initial delay", initial_wait)
sleep(initial_wait)
- _return_is_str = kwargs.pop('return_is_str', return_is_str)
- _return_is_dict = kwargs.pop('return_is_str', return_is_dict)
+ _return_is_str = kwargs.pop("return_is_str", return_is_str)
+ _return_is_dict = kwargs.pop("return_is_str", return_is_dict)
for i in range(1, _attempts + 1):
try:
- _expected = kwargs.setdefault('expected', True)
+ _expected = kwargs.setdefault("expected", True)
if _expected is False:
expected = _expected
- kwargs.pop('expected')
+ kwargs.pop("expected")
ret = func(*args, **kwargs)
logger.debug("Function returned %s", ret)
if _return_is_str and isinstance(ret, bool) and _expected:
return ret
- if (isinstance(ret, str) or isinstance(ret, unicode)) and _expected is False:
+ if (
+ isinstance(ret, str) or isinstance(ret, unicode)
+ ) and _expected is False:
return ret
if _return_is_dict and isinstance(ret, dict):
return ret
@@ -1316,17 +1317,17 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0,
except Exception as err:
if _attempts == i and expected:
generate_support_bundle()
- logger.info("Max number of attempts (%r) reached",
- _attempts)
+ logger.info("Max number of attempts (%r) reached", _attempts)
raise
else:
logger.info("Function returned %s", err)
if i < _attempts:
- logger.info("Retry [#%r] after sleeping for %ss"
- % (i, _wait))
+ logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait))
sleep(_wait)
+
func_retry._original = func
return func_retry
+
return _retry
@@ -1420,58 +1421,63 @@ def create_interfaces_cfg(tgen, topo, build=False):
else:
interface_data.append("ipv6 address {}\n".format(intf_addr))
- if 'ospf' in data:
- ospf_data = data['ospf']
- if 'area' in ospf_data:
- intf_ospf_area = c_data["links"][destRouterLink][
- "ospf"]["area"]
+ if "ospf" in data:
+ ospf_data = data["ospf"]
+ if "area" in ospf_data:
+ intf_ospf_area = c_data["links"][destRouterLink]["ospf"]["area"]
if "delete" in data and data["delete"]:
interface_data.append("no ip ospf area")
else:
- interface_data.append("ip ospf area {}".format(
- intf_ospf_area
- ))
+ interface_data.append(
+ "ip ospf area {}".format(intf_ospf_area)
+ )
- if "hello_interval" in ospf_data:
- intf_ospf_hello = c_data["links"][destRouterLink][
- "ospf"]["hello_interval"]
+ if "hello_interval" in ospf_data:
+ intf_ospf_hello = c_data["links"][destRouterLink]["ospf"][
+ "hello_interval"
+ ]
if "delete" in data and data["delete"]:
- interface_data.append("no ip ospf "\
- " hello-interval")
+ interface_data.append("no ip ospf " " hello-interval")
else:
- interface_data.append("ip ospf "\
- " hello-interval {}".format(intf_ospf_hello))
+ interface_data.append(
+ "ip ospf " " hello-interval {}".format(intf_ospf_hello)
+ )
if "dead_interval" in ospf_data:
- intf_ospf_dead = c_data["links"][destRouterLink][
- "ospf"]["dead_interval"]
+ intf_ospf_dead = c_data["links"][destRouterLink]["ospf"][
+ "dead_interval"
+ ]
if "delete" in data and data["delete"]:
- interface_data.append("no ip ospf"\
- " dead-interval")
+ interface_data.append("no ip ospf" " dead-interval")
else:
- interface_data.append("ip ospf "\
- " dead-interval {}".format(intf_ospf_dead))
+ interface_data.append(
+ "ip ospf " " dead-interval {}".format(intf_ospf_dead)
+ )
if "network" in ospf_data:
- intf_ospf_nw = c_data["links"][destRouterLink][
- "ospf"]["network"]
+ intf_ospf_nw = c_data["links"][destRouterLink]["ospf"][
+ "network"
+ ]
if "delete" in data and data["delete"]:
- interface_data.append("no ip ospf"\
- " network {}".format(intf_ospf_nw))
+ interface_data.append(
+ "no ip ospf" " network {}".format(intf_ospf_nw)
+ )
else:
- interface_data.append("ip ospf"\
- " network {}".format(intf_ospf_nw))
+ interface_data.append(
+ "ip ospf" " network {}".format(intf_ospf_nw)
+ )
if "priority" in ospf_data:
- intf_ospf_nw = c_data["links"][destRouterLink][
- "ospf"]["priority"]
+ intf_ospf_nw = c_data["links"][destRouterLink]["ospf"][
+ "priority"
+ ]
if "delete" in data and data["delete"]:
- interface_data.append("no ip ospf"\
- " priority")
+ interface_data.append("no ip ospf" " priority")
else:
- interface_data.append("ip ospf"\
- " priority {}".format(intf_ospf_nw))
+ interface_data.append(
+ "ip ospf" " priority {}".format(intf_ospf_nw)
+ )
result = create_common_configuration(
tgen, c_router, interface_data, "interface_config", build=build
)
@@ -3013,7 +3019,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
- #st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+ # st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
@@ -3118,7 +3124,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
nh_found = False
for st_rt in ip_list:
- #st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+ # st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
@@ -4075,8 +4081,9 @@ def required_linux_kernel_version(required_version):
"""
system_kernel = platform.release()
if version_cmp(system_kernel, required_version) < 0:
- error_msg = ('These tests will not run on kernel "{}", '
- 'they require kernel >= {})'.format(system_kernel,
- required_version ))
+ error_msg = (
+ 'These tests will not run on kernel "{}", '
+ "they require kernel >= {})".format(system_kernel, required_version)
+ )
return error_msg
return True