diff options
| author | reformat <reformat@nobody.nobody> | 2020-04-03 14:05:24 +0300 |
|---|---|---|
| committer | Donatas Abraitis <donatas.abraitis@gmail.com> | 2020-04-03 19:41:28 +0300 |
| commit | 787e762445d50ca5b52fafcf8dd6de08ab90916f (patch) | |
| tree | afaad3d41a83da180d5fc8bbc7b23d02da7c4dbd /tests/topotests/lib/topogen.py | |
| parent | cd05906c41aaaf94e06fa9d14a7634bdffd99ed0 (diff) | |
tests: Run python formatter (black) for topotests
Mostly ' => ", whitespace changes.
Using https://github.com/psf/black
Signed-off-by: reformat <reformat@nobody.nobody>
Diffstat (limited to 'tests/topotests/lib/topogen.py')
| -rw-r--r-- | tests/topotests/lib/topogen.py | 444 |
1 files changed, 249 insertions, 195 deletions
diff --git a/tests/topotests/lib/topogen.py b/tests/topotests/lib/topogen.py index 6859f5a076..6a6bbc7c78 100644 --- a/tests/topotests/lib/topogen.py +++ b/tests/topotests/lib/topogen.py @@ -70,6 +70,7 @@ CWD = os.path.dirname(os.path.realpath(__file__)) # all test functions without declaring a test local variable. global_tgen = None + def get_topogen(topo=None): """ Helper function to retrieve Topogen. Must be called with `topo` when called @@ -79,31 +80,34 @@ def get_topogen(topo=None): global_tgen.topo = topo return global_tgen + def set_topogen(tgen): "Helper function to set Topogen" # pylint: disable=W0603 global global_tgen global_tgen = tgen + # # Main class: topology builder # # Topogen configuration defaults tgen_defaults = { - 'verbosity': 'info', - 'frrdir': '/usr/lib/frr', - 'quaggadir': '/usr/lib/quagga', - 'routertype': 'frr', - 'memleak_path': None, + "verbosity": "info", + "frrdir": "/usr/lib/frr", + "quaggadir": "/usr/lib/quagga", + "routertype": "frr", + "memleak_path": None, } + class Topogen(object): "A topology test builder helper." - CONFIG_SECTION = 'topogen' + CONFIG_SECTION = "topogen" - def __init__(self, cls, modname='unnamed'): + def __init__(self, cls, modname="unnamed"): """ Topogen initialization function, takes the following arguments: * `cls`: the topology class that is child of mininet.topo @@ -117,16 +121,16 @@ class Topogen(object): self.switchn = 1 self.modname = modname self.errorsd = {} - self.errors = '' + self.errors = "" self.peern = 1 self._init_topo(cls) - logger.info('loading topology: {}'.format(self.modname)) + logger.info("loading topology: {}".format(self.modname)) @staticmethod def _mininet_reset(): "Reset the mininet environment" # Clean up the mininet environment - os.system('mn -c > /dev/null 2>&1') + os.system("mn -c > /dev/null 2>&1") def _init_topo(self, cls): """ @@ -138,10 +142,10 @@ class Topogen(object): # Test for MPLS Kernel modules available self.hasmpls = False - if not topotest.module_present('mpls-router'): - logger.info('MPLS tests will not run (missing mpls-router kernel module)') - elif not topotest.module_present('mpls-iptunnel'): - logger.info('MPLS tests will not run (missing mpls-iptunnel kernel module)') + if not topotest.module_present("mpls-router"): + logger.info("MPLS tests will not run (missing mpls-router kernel module)") + elif not topotest.module_present("mpls-iptunnel"): + logger.info("MPLS tests will not run (missing mpls-iptunnel kernel module)") else: self.hasmpls = True # Load the default topology configurations @@ -160,7 +164,7 @@ class Topogen(object): topotests. """ self.config = configparser.ConfigParser(tgen_defaults) - pytestini_path = os.path.join(CWD, '../pytest.ini') + pytestini_path = os.path.join(CWD, "../pytest.ini") self.config.read(pytestini_path) def add_router(self, name=None, cls=topotest.Router, **params): @@ -173,15 +177,15 @@ class Topogen(object): Returns a TopoRouter. """ if name is None: - name = 'r{}'.format(self.routern) + name = "r{}".format(self.routern) if name in self.gears: - raise KeyError('router already exists') + raise KeyError("router already exists") - params['frrdir'] = self.config.get(self.CONFIG_SECTION, 'frrdir') - params['quaggadir'] = self.config.get(self.CONFIG_SECTION, 'quaggadir') - params['memleak_path'] = self.config.get(self.CONFIG_SECTION, 'memleak_path') - if not params.has_key('routertype'): - params['routertype'] = self.config.get(self.CONFIG_SECTION, 'routertype') + params["frrdir"] = self.config.get(self.CONFIG_SECTION, "frrdir") + params["quaggadir"] = self.config.get(self.CONFIG_SECTION, "quaggadir") + params["memleak_path"] = self.config.get(self.CONFIG_SECTION, "memleak_path") + if not params.has_key("routertype"): + params["routertype"] = self.config.get(self.CONFIG_SECTION, "routertype") self.gears[name] = TopoRouter(self, cls, name, **params) self.routern += 1 @@ -195,9 +199,9 @@ class Topogen(object): Returns the switch name and number. """ if name is None: - name = 's{}'.format(self.switchn) + name = "s{}".format(self.switchn) if name in self.gears: - raise KeyError('switch already exists') + raise KeyError("switch already exists") self.gears[name] = TopoSwitch(self, cls, name) self.switchn += 1 @@ -211,9 +215,9 @@ class Topogen(object): * `defaultRoute`: the peer default route (e.g. 'via 1.2.3.1') """ if name is None: - name = 'peer{}'.format(self.peern) + name = "peer{}".format(self.peern) if name in self.gears: - raise KeyError('exabgp peer already exists') + raise KeyError("exabgp peer already exists") self.gears[name] = TopoExaBGP(self, name, ip=ip, defaultRoute=defaultRoute) self.peern += 1 @@ -228,9 +232,9 @@ class Topogen(object): * TopoSwitch """ if not isinstance(node1, TopoGear): - raise ValueError('invalid node1 type') + raise ValueError("invalid node1 type") if not isinstance(node2, TopoGear): - raise ValueError('invalid node2 type') + raise ValueError("invalid node2 type") if ifname1 is None: ifname1 = node1.new_link() @@ -239,8 +243,7 @@ class Topogen(object): node1.register_link(ifname1, node2, ifname2) node2.register_link(ifname2, node1, ifname1) - self.topo.addLink(node1.name, node2.name, - intfName1=ifname1, intfName2=ifname2) + self.topo.addLink(node1.name, node2.name, intfName1=ifname1, intfName2=ifname2) def get_gears(self, geartype): """ @@ -262,8 +265,11 @@ class Topogen(object): # Do stuff ``` """ - return dict((name, gear) for name, gear in self.gears.iteritems() - if isinstance(gear, geartype)) + return dict( + (name, gear) + for name, gear in self.gears.iteritems() + if isinstance(gear, geartype) + ) def routers(self): """ @@ -291,16 +297,16 @@ class Topogen(object): """ # If log_level is not specified use the configuration. if log_level is None: - log_level = self.config.get(self.CONFIG_SECTION, 'verbosity') + log_level = self.config.get(self.CONFIG_SECTION, "verbosity") # Set python logger level logger_config.set_log_level(log_level) # Run mininet - if log_level == 'debug': + if log_level == "debug": setLogLevel(log_level) - logger.info('starting topology: {}'.format(self.modname)) + logger.info("starting topology: {}".format(self.modname)) self.net.start() def start_router(self, router=None): @@ -326,7 +332,7 @@ class Topogen(object): first is a simple kill with no sleep, the second will sleep if not killed and try with a different signal. """ - logger.info('stopping topology: {}'.format(self.modname)) + logger.info("stopping topology: {}".format(self.modname)) errors = "" for gear in self.gears.values(): gear.stop(False, False) @@ -344,7 +350,8 @@ class Topogen(object): """ if not sys.stdin.isatty(): raise EnvironmentError( - 'you must run pytest with \'-s\' in order to use mininet CLI') + "you must run pytest with '-s' in order to use mininet CLI" + ) CLI(self.net) @@ -354,8 +361,9 @@ class Topogen(object): if self.routers_have_failure(): return False - memleak_file = (os.environ.get('TOPOTESTS_CHECK_MEMLEAK') or - self.config.get(self.CONFIG_SECTION, 'memleak_path')) + memleak_file = os.environ.get("TOPOTESTS_CHECK_MEMLEAK") or self.config.get( + self.CONFIG_SECTION, "memleak_path" + ) if memleak_file is None: return False return True @@ -382,7 +390,7 @@ class Topogen(object): code = len(self.errorsd) self.errorsd[code] = message - self.errors += '\n{}: {}'.format(code, message) + self.errors += "\n{}: {}".format(code, message) def has_errors(self): "Returns whether errors exist or not." @@ -393,23 +401,25 @@ class Topogen(object): if self.has_errors(): return True - errors = '' + errors = "" router_list = self.routers().values() for router in router_list: result = router.check_router_running() - if result != '': - errors += result + '\n' + if result != "": + errors += result + "\n" - if errors != '': - self.set_error(errors, 'router_error') + if errors != "": + self.set_error(errors, "router_error") assert False, errors return True return False + # # Topology gears (equipment) # + class TopoGear(object): "Abstract class for type checking" @@ -421,11 +431,11 @@ class TopoGear(object): self.linkn = 0 def __str__(self): - links = '' + links = "" for myif, dest in self.links.iteritems(): _, destif = dest - if links != '': - links += ',' + if links != "": + links += "," links += '"{}"<->"{}"'.format(myif, destif) return 'TopoGear<name="{}",links=[{}]>'.format(self.name, links) @@ -462,20 +472,22 @@ class TopoGear(object): enabled: whether we should enable or disable the interface """ if myif not in self.links.keys(): - raise KeyError('interface doesn\'t exists') + raise KeyError("interface doesn't exists") if enabled is True: - operation = 'up' + operation = "up" else: - operation = 'down' + operation = "down" - logger.info('setting node "{}" link "{}" to state "{}"'.format( - self.name, myif, operation - )) - extract='' + logger.info( + 'setting node "{}" link "{}" to state "{}"'.format( + self.name, myif, operation + ) + ) + extract = "" if netns is not None: - extract = 'ip netns exec {} '.format(netns) - return self.run('{}ip link set dev {} {}'.format(extract, myif, operation)) + extract = "ip netns exec {} ".format(netns) + return self.run("{}ip link set dev {} {}".format(extract, myif, operation)) def peer_link_enable(self, myif, enabled=True, netns=None): """ @@ -487,7 +499,7 @@ class TopoGear(object): peer disables their interface our interface status changes to no link. """ if myif not in self.links.keys(): - raise KeyError('interface doesn\'t exists') + raise KeyError("interface doesn't exists") node, nodeif = self.links[myif] node.link_enable(nodeif, enabled, netns) @@ -498,7 +510,7 @@ class TopoGear(object): NOTE: This function should only be called by Topogen. """ - ifname = '{}-eth{}'.format(self.name, self.linkn) + ifname = "{}-eth{}".format(self.name, self.linkn) self.linkn += 1 return ifname @@ -509,10 +521,11 @@ class TopoGear(object): NOTE: This function should only be called by Topogen. """ if myif in self.links.keys(): - raise KeyError('interface already exists') + raise KeyError("interface already exists") self.links[myif] = (node, nodeif) + class TopoRouter(TopoGear): """ Router abstraction. @@ -520,11 +533,11 @@ class TopoRouter(TopoGear): # The default required directories by Quagga/FRR PRIVATE_DIRS = [ - '/etc/frr', - '/etc/quagga', - '/var/run/frr', - '/var/run/quagga', - '/var/log' + "/etc/frr", + "/etc/quagga", + "/var/run/frr", + "/var/run/quagga", + "/var/log", ] # Router Daemon enumeration definition. @@ -543,20 +556,20 @@ class TopoRouter(TopoGear): RD_BFD = 13 RD_SHARP = 14 RD = { - RD_ZEBRA: 'zebra', - RD_RIP: 'ripd', - RD_RIPNG: 'ripngd', - RD_OSPF: 'ospfd', - RD_OSPF6: 'ospf6d', - RD_ISIS: 'isisd', - RD_BGP: 'bgpd', - RD_PIM: 'pimd', - RD_LDP: 'ldpd', - RD_EIGRP: 'eigrpd', - RD_NHRP: 'nhrpd', - RD_STATIC: 'staticd', - RD_BFD: 'bfdd', - RD_SHARP: 'sharpd', + RD_ZEBRA: "zebra", + RD_RIP: "ripd", + RD_RIPNG: "ripngd", + RD_OSPF: "ospfd", + RD_OSPF6: "ospf6d", + RD_ISIS: "isisd", + RD_BGP: "bgpd", + RD_PIM: "pimd", + RD_LDP: "ldpd", + RD_EIGRP: "eigrpd", + RD_NHRP: "nhrpd", + RD_STATIC: "staticd", + RD_BFD: "bfdd", + RD_SHARP: "sharpd", } def __init__(self, tgen, cls, name, **params): @@ -574,34 +587,34 @@ class TopoRouter(TopoGear): self.name = name self.cls = cls self.options = {} - self.routertype = params.get('routertype', 'frr') - if not params.has_key('privateDirs'): - params['privateDirs'] = self.PRIVATE_DIRS + self.routertype = params.get("routertype", "frr") + if not params.has_key("privateDirs"): + params["privateDirs"] = self.PRIVATE_DIRS - self.options['memleak_path'] = params.get('memleak_path', None) + self.options["memleak_path"] = params.get("memleak_path", None) # Create new log directory - self.logdir = '/tmp/topotests/{}'.format(self.tgen.modname) + self.logdir = "/tmp/topotests/{}".format(self.tgen.modname) # Clean up before starting new log files: avoids removing just created # log files. self._prepare_tmpfiles() # Propagate the router log directory - params['logdir'] = self.logdir + params["logdir"] = self.logdir - #setup the per node directory - dir = '{}/{}'.format(self.logdir, self.name) - os.system('mkdir -p ' + dir) - os.system('chmod -R go+rw /tmp/topotests') + # setup the per node directory + dir = "{}/{}".format(self.logdir, self.name) + os.system("mkdir -p " + dir) + os.system("chmod -R go+rw /tmp/topotests") # Open router log file - logfile = '{0}/{1}.log'.format(self.logdir, name) + logfile = "{0}/{1}.log".format(self.logdir, name) self.logger = logger_config.get_logger(name=name, target=logfile) self.tgen.topo.addNode(self.name, cls=self.cls, **params) def __str__(self): gear = super(TopoRouter, self).__str__() - gear += ' TopoRouter<>' + gear += " TopoRouter<>" return gear def _prepare_tmpfiles(self): @@ -622,9 +635,9 @@ class TopoRouter(TopoGear): os.chmod(self.logdir, 0o1777) # Try to find relevant old logfiles in /tmp and delete them - map(os.remove, glob.glob('{}/{}/*.log'.format(self.logdir, self.name))) + map(os.remove, glob.glob("{}/{}/*.log".format(self.logdir, self.name))) # Remove old core files - map(os.remove, glob.glob('{}/{}/*.dmp'.format(self.logdir, self.name))) + map(os.remove, glob.glob("{}/{}/*.dmp".format(self.logdir, self.name))) def check_capability(self, daemon, param): """ @@ -651,7 +664,7 @@ class TopoRouter(TopoGear): """ Run a series of checks and returns a status string. """ - self.logger.info('checking if daemons are running') + self.logger.info("checking if daemons are running") return self.tgen.net[self.name].checkRouterRunning() def start(self): @@ -663,7 +676,7 @@ class TopoRouter(TopoGear): * Start daemons (e.g. FRR/Quagga) * Configure daemon logging files """ - self.logger.debug('starting') + self.logger.debug("starting") nrouter = self.tgen.net[self.name] result = nrouter.startRouter(self.tgen) @@ -672,15 +685,17 @@ class TopoRouter(TopoGear): for daemon, enabled in nrouter.daemons.iteritems(): if enabled == 0: continue - self.vtysh_cmd('configure terminal\nlog commands\nlog file {}.log'.format( - daemon), daemon=daemon) + self.vtysh_cmd( + "configure terminal\nlog commands\nlog file {}.log".format(daemon), + daemon=daemon, + ) - if result != '': + if result != "": self.tgen.set_error(result) else: # Enable MPLS processing on all interfaces. for interface in self.links.keys(): - set_sysctl(nrouter, 'net.mpls.conf.{}.input'.format(interface), 1) + set_sysctl(nrouter, "net.mpls.conf.{}.input".format(interface), 1) return result @@ -689,7 +704,7 @@ class TopoRouter(TopoGear): Stop router: * Kill daemons """ - self.logger.debug('stopping') + self.logger.debug("stopping") return self.tgen.net[self.name].stopRouter(wait, assertOnError) def vtysh_cmd(self, command, isjson=False, daemon=None): @@ -701,25 +716,26 @@ class TopoRouter(TopoGear): return output for each command. See vtysh_multicmd() for more details. """ # Detect multi line commands - if command.find('\n') != -1: + if command.find("\n") != -1: return self.vtysh_multicmd(command, daemon=daemon) - dparam = '' + dparam = "" if daemon is not None: - dparam += '-d {}'.format(daemon) + dparam += "-d {}".format(daemon) vtysh_command = 'vtysh {} -c "{}" 2>/dev/null'.format(dparam, command) output = self.run(vtysh_command) - self.logger.info('\nvtysh command => {}\nvtysh output <= {}'.format( - command, output)) + self.logger.info( + "\nvtysh command => {}\nvtysh output <= {}".format(command, output) + ) if isjson is False: return output try: return json.loads(output) except ValueError: - logger.warning('vtysh_cmd: failed to convert json output') + logger.warning("vtysh_cmd: failed to convert json output") return {} def vtysh_multicmd(self, commands, pretty_output=True, daemon=None): @@ -734,21 +750,22 @@ class TopoRouter(TopoGear): # Prepare the temporary file that will hold the commands fname = topotest.get_file(commands) - dparam = '' + dparam = "" if daemon is not None: - dparam += '-d {}'.format(daemon) + dparam += "-d {}".format(daemon) # Run the commands and delete the temporary file if pretty_output: - vtysh_command = 'vtysh {} < {}'.format(dparam, fname) + vtysh_command = "vtysh {} < {}".format(dparam, fname) else: - vtysh_command = 'vtysh {} -f {}'.format(dparam, fname) + vtysh_command = "vtysh {} -f {}".format(dparam, fname) res = self.run(vtysh_command) os.unlink(fname) - self.logger.info('\nvtysh command => "{}"\nvtysh output <= "{}"'.format( - vtysh_command, res)) + self.logger.info( + '\nvtysh command => "{}"\nvtysh output <= "{}"'.format(vtysh_command, res) + ) return res @@ -760,27 +777,29 @@ class TopoRouter(TopoGear): NOTE: to run this you must have the environment variable TOPOTESTS_CHECK_MEMLEAK set or memleak_path configured in `pytest.ini`. """ - memleak_file = os.environ.get('TOPOTESTS_CHECK_MEMLEAK') or self.options['memleak_path'] + memleak_file = ( + os.environ.get("TOPOTESTS_CHECK_MEMLEAK") or self.options["memleak_path"] + ) if memleak_file is None: return self.stop() - self.logger.info('running memory leak report') + self.logger.info("running memory leak report") self.tgen.net[self.name].report_memory_leaks(memleak_file, testname) def version_info(self): "Get equipment information from 'show version'." - output = self.vtysh_cmd('show version').split('\n')[0] - columns = topotest.normalize_text(output).split(' ') + output = self.vtysh_cmd("show version").split("\n")[0] + columns = topotest.normalize_text(output).split(" ") try: return { - 'type': columns[0], - 'version': columns[1], + "type": columns[0], + "version": columns[1], } except IndexError: return { - 'type': None, - 'version': None, + "type": None, + "version": None, } def has_version(self, cmpop, version): @@ -802,19 +821,21 @@ class TopoRouter(TopoGear): Compares router type with `rtype`. Returns `True` if the type matches, otherwise `false`. """ - curtype = self.version_info()['type'] + curtype = self.version_info()["type"] return rtype == curtype def has_mpls(self): nrouter = self.tgen.net[self.name] return nrouter.hasmpls + class TopoSwitch(TopoGear): """ Switch abstraction. Has the following properties: * cls: switch class that will be used to instantiate * name: switch name """ + # pylint: disable=too-few-public-methods def __init__(self, tgen, cls, name): @@ -827,9 +848,10 @@ class TopoSwitch(TopoGear): def __str__(self): gear = super(TopoSwitch, self).__str__() - gear += ' TopoSwitch<>' + gear += " TopoSwitch<>" return gear + class TopoHost(TopoGear): "Host abstraction." # pylint: disable=too-few-public-methods @@ -853,18 +875,21 @@ class TopoHost(TopoGear): def __str__(self): gear = super(TopoHost, self).__str__() gear += ' TopoHost<ip="{}",defaultRoute="{}",privateDirs="{}">'.format( - self.options['ip'], self.options['defaultRoute'], - str(self.options['privateDirs'])) + self.options["ip"], + self.options["defaultRoute"], + str(self.options["privateDirs"]), + ) return gear + class TopoExaBGP(TopoHost): "ExaBGP peer abstraction." # pylint: disable=too-few-public-methods PRIVATE_DIRS = [ - '/etc/exabgp', - '/var/run/exabgp', - '/var/log', + "/etc/exabgp", + "/var/run/exabgp", + "/var/log", ] def __init__(self, tgen, name, **params): @@ -878,13 +903,13 @@ class TopoExaBGP(TopoHost): has a privateDirs already defined and contains functions to handle ExaBGP things. """ - params['privateDirs'] = self.PRIVATE_DIRS + params["privateDirs"] = self.PRIVATE_DIRS super(TopoExaBGP, self).__init__(tgen, name, **params) self.tgen.topo.addHost(name, **params) def __str__(self): gear = super(TopoExaBGP, self).__str__() - gear += ' TopoExaBGP<>'.format() + gear += " TopoExaBGP<>".format() return gear def start(self, peer_dir, env_file=None): @@ -895,22 +920,22 @@ class TopoExaBGP(TopoHost): * Make all python files runnable * Run ExaBGP with env file `env_file` and configuration peer*/exabgp.cfg """ - self.run('mkdir /etc/exabgp') - self.run('chmod 755 /etc/exabgp') - self.run('cp {}/* /etc/exabgp/'.format(peer_dir)) + self.run("mkdir /etc/exabgp") + self.run("chmod 755 /etc/exabgp") + self.run("cp {}/* /etc/exabgp/".format(peer_dir)) if env_file is not None: - self.run('cp {} /etc/exabgp/exabgp.env'.format(env_file)) - self.run('chmod 644 /etc/exabgp/*') - self.run('chmod a+x /etc/exabgp/*.py') - self.run('chown -R exabgp:exabgp /etc/exabgp') - output = self.run('exabgp -e /etc/exabgp/exabgp.env /etc/exabgp/exabgp.cfg') + self.run("cp {} /etc/exabgp/exabgp.env".format(env_file)) + self.run("chmod 644 /etc/exabgp/*") + self.run("chmod a+x /etc/exabgp/*.py") + self.run("chown -R exabgp:exabgp /etc/exabgp") + output = self.run("exabgp -e /etc/exabgp/exabgp.env /etc/exabgp/exabgp.cfg") if output == None or len(output) == 0: - output = '<none>' - logger.info('{} exabgp started, output={}'.format(self.name, output)) + output = "<none>" + logger.info("{} exabgp started, output={}".format(self.name, output)) def stop(self, wait=True, assertOnError=True): "Stop ExaBGP peer and kill the daemon" - self.run('kill `cat /var/run/exabgp/exabgp.pid`') + self.run("kill `cat /var/run/exabgp/exabgp.pid`") return "" @@ -928,160 +953,189 @@ def diagnose_env_linux(): ret = True # Test log path exists before installing handler. - if not os.path.isdir('/tmp'): - logger.warning('could not find /tmp for logs') + if not os.path.isdir("/tmp"): + logger.warning("could not find /tmp for logs") else: - os.system('mkdir /tmp/topotests') + os.system("mkdir /tmp/topotests") # Log diagnostics to file so it can be examined later. - fhandler = logging.FileHandler(filename='/tmp/topotests/diagnostics.txt') + fhandler = logging.FileHandler(filename="/tmp/topotests/diagnostics.txt") fhandler.setLevel(logging.DEBUG) fhandler.setFormatter( - logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s') + logging.Formatter(fmt="%(asctime)s %(levelname)s: %(message)s") ) logger.addHandler(fhandler) - logger.info('Running environment diagnostics') + logger.info("Running environment diagnostics") # Load configuration config = configparser.ConfigParser(tgen_defaults) - pytestini_path = os.path.join(CWD, '../pytest.ini') + pytestini_path = os.path.join(CWD, "../pytest.ini") config.read(pytestini_path) # Assert that we are running as root if os.getuid() != 0: - logger.error('you must run topotest as root') + logger.error("you must run topotest as root") ret = False # Assert that we have mininet - if os.system('which mn >/dev/null 2>/dev/null') != 0: - logger.error('could not find mininet binary (mininet is not installed)') + if os.system("which mn >/dev/null 2>/dev/null") != 0: + logger.error("could not find mininet binary (mininet is not installed)") ret = False # Assert that we have iproute installed - if os.system('which ip >/dev/null 2>/dev/null') != 0: - logger.error('could not find ip binary (iproute is not installed)') + if os.system("which ip >/dev/null 2>/dev/null") != 0: + logger.error("could not find ip binary (iproute is not installed)") ret = False # Assert that we have gdb installed - if os.system('which gdb >/dev/null 2>/dev/null') != 0: - logger.error('could not find gdb binary (gdb is not installed)') + if os.system("which gdb >/dev/null 2>/dev/null") != 0: + logger.error("could not find gdb binary (gdb is not installed)") ret = False # Assert that FRR utilities exist - frrdir = config.get('topogen', 'frrdir') + frrdir = config.get("topogen", "frrdir") hasfrr = False if not os.path.isdir(frrdir): - logger.error('could not find {} directory'.format(frrdir)) + logger.error("could not find {} directory".format(frrdir)) ret = False else: hasfrr = True try: - pwd.getpwnam('frr')[2] + pwd.getpwnam("frr")[2] except KeyError: logger.warning('could not find "frr" user') try: - grp.getgrnam('frr')[2] + grp.getgrnam("frr")[2] except KeyError: logger.warning('could not find "frr" group') try: - if 'frr' not in grp.getgrnam('frrvty').gr_mem: - logger.error('"frr" user and group exist, but user is not under "frrvty"') + if "frr" not in grp.getgrnam("frrvty").gr_mem: + logger.error( + '"frr" user and group exist, but user is not under "frrvty"' + ) except KeyError: logger.warning('could not find "frrvty" group') - for fname in ['zebra', 'ospfd', 'ospf6d', 'bgpd', 'ripd', 'ripngd', - 'isisd', 'pimd', 'ldpd']: + for fname in [ + "zebra", + "ospfd", + "ospf6d", + "bgpd", + "ripd", + "ripngd", + "isisd", + "pimd", + "ldpd", + ]: path = os.path.join(frrdir, fname) if not os.path.isfile(path): # LDPd is an exception - if fname == 'ldpd': - logger.info('could not find {} in {}'.format(fname, frrdir) + - '(LDPd tests will not run)') + if fname == "ldpd": + logger.info( + "could not find {} in {}".format(fname, frrdir) + + "(LDPd tests will not run)" + ) continue - logger.warning('could not find {} in {}'.format(fname, frrdir)) + logger.warning("could not find {} in {}".format(fname, frrdir)) ret = False else: - if fname != 'zebra': + if fname != "zebra": continue - os.system( - '{} -v 2>&1 >/tmp/topotests/frr_zebra.txt'.format(path) - ) + os.system("{} -v 2>&1 >/tmp/topotests/frr_zebra.txt".format(path)) # Assert that Quagga utilities exist - quaggadir = config.get('topogen', 'quaggadir') + quaggadir = config.get("topogen", "quaggadir") if hasfrr: # if we have frr, don't check for quagga pass elif not os.path.isdir(quaggadir): - logger.info('could not find {} directory (quagga tests will not run)'.format(quaggadir)) + logger.info( + "could not find {} directory (quagga tests will not run)".format(quaggadir) + ) else: ret = True try: - pwd.getpwnam('quagga')[2] + pwd.getpwnam("quagga")[2] except KeyError: logger.info('could not find "quagga" user') try: - grp.getgrnam('quagga')[2] + grp.getgrnam("quagga")[2] except KeyError: logger.info('could not find "quagga" group') try: - if 'quagga' not in grp.getgrnam('quaggavty').gr_mem: - logger.error('"quagga" user and group exist, but user is not under "quaggavty"') + if "quagga" not in grp.getgrnam("quaggavty").gr_mem: + logger.error( + '"quagga" user and group exist, but user is not under "quaggavty"' + ) except KeyError: logger.warning('could not find "quaggavty" group') - for fname in ['zebra', 'ospfd', 'ospf6d', 'bgpd', 'ripd', 'ripngd', - 'isisd', 'pimd']: + for fname in [ + "zebra", + "ospfd", + "ospf6d", + "bgpd", + "ripd", + "ripngd", + "isisd", + "pimd", + ]: path = os.path.join(quaggadir, fname) if not os.path.isfile(path): - logger.warning('could not find {} in {}'.format(fname, quaggadir)) + logger.warning("could not find {} in {}".format(fname, quaggadir)) ret = False else: - if fname != 'zebra': + if fname != "zebra": continue - os.system( - '{} -v 2>&1 >/tmp/topotests/quagga_zebra.txt'.format(path) - ) + os.system("{} -v 2>&1 >/tmp/topotests/quagga_zebra.txt".format(path)) # Test MPLS availability krel = platform.release() - if topotest.version_cmp(krel, '4.5') < 0: - logger.info('LDPd tests will not run (have kernel "{}", but it requires 4.5)'.format(krel)) + if topotest.version_cmp(krel, "4.5") < 0: + logger.info( + 'LDPd tests will not run (have kernel "{}", but it requires 4.5)'.format( + krel + ) + ) # Test for MPLS Kernel modules available - if not topotest.module_present('mpls-router', load=False) != 0: - logger.info('LDPd tests will not run (missing mpls-router kernel module)') - if not topotest.module_present('mpls-iptunnel', load=False) != 0: - logger.info('LDPd tests will not run (missing mpls-iptunnel kernel module)') + if not topotest.module_present("mpls-router", load=False) != 0: + logger.info("LDPd tests will not run (missing mpls-router kernel module)") + if not topotest.module_present("mpls-iptunnel", load=False) != 0: + logger.info("LDPd tests will not run (missing mpls-iptunnel kernel module)") # TODO remove me when we start supporting exabgp >= 4 try: - output = subprocess.check_output(['exabgp', '-v']) - line = output.split('\n')[0] - version = line.split(' ')[2] - if topotest.version_cmp(version, '4') >= 0: - logger.warning('BGP topologies are still using exabgp version 3, expect failures') + output = subprocess.check_output(["exabgp", "-v"]) + line = output.split("\n")[0] + version = line.split(" ")[2] + if topotest.version_cmp(version, "4") >= 0: + logger.warning( + "BGP topologies are still using exabgp version 3, expect failures" + ) # We want to catch all exceptions # pylint: disable=W0702 except: - logger.warning('failed to find exabgp or returned error') + logger.warning("failed to find exabgp or returned error") # After we logged the output to file, remove the handler. logger.removeHandler(fhandler) return ret + def diagnose_env_freebsd(): return True + def diagnose_env(): if sys.platform.startswith("linux"): return diagnose_env_linux() |
