diff options
Diffstat (limited to 'tests/topotests/lib/common_config.py')
| -rw-r--r-- | tests/topotests/lib/common_config.py | 125 |
1 files changed, 66 insertions, 59 deletions
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 1fa6d35101..d83f946c42 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -400,6 +400,7 @@ def check_router_status(tgen): logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True + def getStrIO(): """ Return a StringIO object appropriate for the current python version. @@ -409,6 +410,7 @@ def getStrIO(): else: return StringIO.StringIO() + def reset_config_on_routers(tgen, routerName=None): """ Resets configuration on routers to the snapshot created using input JSON @@ -702,14 +704,14 @@ def start_topology(tgen, daemon=None): ) TMPDIR = os.path.join(LOGDIR, tgen.modname) - linux_ver = '' + linux_ver = "" router_list = tgen.routers() for rname in ROUTER_LIST: router = router_list[rname] # It will help in debugging the failures, will give more details on which # specific kernel version tests are failing - if linux_ver == '': + if linux_ver == "": linux_ver = router.run("uname -a") logger.info("Logging platform related details: \n %s \n", linux_ver) @@ -741,11 +743,10 @@ def start_topology(tgen, daemon=None): # Loading empty bgpd.conf file to router, to start the bgp deamon router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname)) - if daemon and 'ospfd' in daemon: + if daemon and "ospfd" in daemon: # Loading empty ospf.conf file to router, to start the bgp deamon router.load_config( - TopoRouter.RD_OSPF, - '{}/{}/ospfd.conf'.format(TMPDIR, rname) + TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(TMPDIR, rname) ) # Starting routers logger.info("Starting all routers once topology is created") @@ -831,8 +832,8 @@ def topo_daemons(tgen, topo): ) for rtr in ROUTER_LIST: - if 'ospf' in topo['routers'][rtr] and 'ospfd' not in daemon_list: - daemon_list.append('ospfd') + if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list: + daemon_list.append("ospfd") return daemon_list @@ -1266,8 +1267,7 @@ def interface_status(tgen, topo, input_dict): return True -def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, - return_is_dict=False): +def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False): """ Retries function execution, if return is an errormsg or exception @@ -1279,11 +1279,10 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, """ def _retry(func): - @wraps(func) def func_retry(*args, **kwargs): - _wait = kwargs.pop('wait', wait) - _attempts = kwargs.pop('attempts', attempts) + _wait = kwargs.pop("wait", wait) + _attempts = kwargs.pop("attempts", attempts) _attempts = int(_attempts) expected = True if _attempts < 0: @@ -1293,19 +1292,21 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, logger.info("Waiting for [%s]s as initial delay", initial_wait) sleep(initial_wait) - _return_is_str = kwargs.pop('return_is_str', return_is_str) - _return_is_dict = kwargs.pop('return_is_str', return_is_dict) + _return_is_str = kwargs.pop("return_is_str", return_is_str) + _return_is_dict = kwargs.pop("return_is_str", return_is_dict) for i in range(1, _attempts + 1): try: - _expected = kwargs.setdefault('expected', True) + _expected = kwargs.setdefault("expected", True) if _expected is False: expected = _expected - kwargs.pop('expected') + kwargs.pop("expected") ret = func(*args, **kwargs) logger.debug("Function returned %s", ret) if _return_is_str and isinstance(ret, bool) and _expected: return ret - if (isinstance(ret, str) or isinstance(ret, unicode)) and _expected is False: + if ( + isinstance(ret, str) or isinstance(ret, unicode) + ) and _expected is False: return ret if _return_is_dict and isinstance(ret, dict): return ret @@ -1316,17 +1317,17 @@ def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, except Exception as err: if _attempts == i and expected: generate_support_bundle() - logger.info("Max number of attempts (%r) reached", - _attempts) + logger.info("Max number of attempts (%r) reached", _attempts) raise else: logger.info("Function returned %s", err) if i < _attempts: - logger.info("Retry [#%r] after sleeping for %ss" - % (i, _wait)) + logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait)) sleep(_wait) + func_retry._original = func return func_retry + return _retry @@ -1420,58 +1421,63 @@ def create_interfaces_cfg(tgen, topo, build=False): else: interface_data.append("ipv6 address {}\n".format(intf_addr)) - if 'ospf' in data: - ospf_data = data['ospf'] - if 'area' in ospf_data: - intf_ospf_area = c_data["links"][destRouterLink][ - "ospf"]["area"] + if "ospf" in data: + ospf_data = data["ospf"] + if "area" in ospf_data: + intf_ospf_area = c_data["links"][destRouterLink]["ospf"]["area"] if "delete" in data and data["delete"]: interface_data.append("no ip ospf area") else: - interface_data.append("ip ospf area {}".format( - intf_ospf_area - )) + interface_data.append( + "ip ospf area {}".format(intf_ospf_area) + ) - if "hello_interval" in ospf_data: - intf_ospf_hello = c_data["links"][destRouterLink][ - "ospf"]["hello_interval"] + if "hello_interval" in ospf_data: + intf_ospf_hello = c_data["links"][destRouterLink]["ospf"][ + "hello_interval" + ] if "delete" in data and data["delete"]: - interface_data.append("no ip ospf "\ - " hello-interval") + interface_data.append("no ip ospf " " hello-interval") else: - interface_data.append("ip ospf "\ - " hello-interval {}".format(intf_ospf_hello)) + interface_data.append( + "ip ospf " " hello-interval {}".format(intf_ospf_hello) + ) if "dead_interval" in ospf_data: - intf_ospf_dead = c_data["links"][destRouterLink][ - "ospf"]["dead_interval"] + intf_ospf_dead = c_data["links"][destRouterLink]["ospf"][ + "dead_interval" + ] if "delete" in data and data["delete"]: - interface_data.append("no ip ospf"\ - " dead-interval") + interface_data.append("no ip ospf" " dead-interval") else: - interface_data.append("ip ospf "\ - " dead-interval {}".format(intf_ospf_dead)) + interface_data.append( + "ip ospf " " dead-interval {}".format(intf_ospf_dead) + ) if "network" in ospf_data: - intf_ospf_nw = c_data["links"][destRouterLink][ - "ospf"]["network"] + intf_ospf_nw = c_data["links"][destRouterLink]["ospf"][ + "network" + ] if "delete" in data and data["delete"]: - interface_data.append("no ip ospf"\ - " network {}".format(intf_ospf_nw)) + interface_data.append( + "no ip ospf" " network {}".format(intf_ospf_nw) + ) else: - interface_data.append("ip ospf"\ - " network {}".format(intf_ospf_nw)) + interface_data.append( + "ip ospf" " network {}".format(intf_ospf_nw) + ) if "priority" in ospf_data: - intf_ospf_nw = c_data["links"][destRouterLink][ - "ospf"]["priority"] + intf_ospf_nw = c_data["links"][destRouterLink]["ospf"][ + "priority" + ] if "delete" in data and data["delete"]: - interface_data.append("no ip ospf"\ - " priority") + interface_data.append("no ip ospf" " priority") else: - interface_data.append("ip ospf"\ - " priority {}".format(intf_ospf_nw)) + interface_data.append( + "ip ospf" " priority {}".format(intf_ospf_nw) + ) result = create_common_configuration( tgen, c_router, interface_data, "interface_config", build=build ) @@ -3013,7 +3019,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): for st_rt in ip_list: st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) - #st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + # st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) _addr_type = validate_ip_address(st_rt) if _addr_type != addr_type: @@ -3118,7 +3124,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): nh_found = False for st_rt in ip_list: - #st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + # st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) st_rt = str(ipaddress.ip_network(frr_unicode(st_rt))) _addr_type = validate_ip_address(st_rt) @@ -4075,8 +4081,9 @@ def required_linux_kernel_version(required_version): """ system_kernel = platform.release() if version_cmp(system_kernel, required_version) < 0: - error_msg = ('These tests will not run on kernel "{}", ' - 'they require kernel >= {})'.format(system_kernel, - required_version )) + error_msg = ( + 'These tests will not run on kernel "{}", ' + "they require kernel >= {})".format(system_kernel, required_version) + ) return error_msg return True |
