summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/topogen.py
diff options
context:
space:
mode:
authorreformat <reformat@nobody.nobody>2020-04-03 14:05:24 +0300
committerDonatas Abraitis <donatas.abraitis@gmail.com>2020-04-03 19:41:28 +0300
commit787e762445d50ca5b52fafcf8dd6de08ab90916f (patch)
treeafaad3d41a83da180d5fc8bbc7b23d02da7c4dbd /tests/topotests/lib/topogen.py
parentcd05906c41aaaf94e06fa9d14a7634bdffd99ed0 (diff)
tests: Run python formatter (black) for topotests
Mostly ' => ", whitespace changes. Using https://github.com/psf/black Signed-off-by: reformat <reformat@nobody.nobody>
Diffstat (limited to 'tests/topotests/lib/topogen.py')
-rw-r--r--tests/topotests/lib/topogen.py444
1 files changed, 249 insertions, 195 deletions
diff --git a/tests/topotests/lib/topogen.py b/tests/topotests/lib/topogen.py
index 6859f5a076..6a6bbc7c78 100644
--- a/tests/topotests/lib/topogen.py
+++ b/tests/topotests/lib/topogen.py
@@ -70,6 +70,7 @@ CWD = os.path.dirname(os.path.realpath(__file__))
# all test functions without declaring a test local variable.
global_tgen = None
+
def get_topogen(topo=None):
"""
Helper function to retrieve Topogen. Must be called with `topo` when called
@@ -79,31 +80,34 @@ def get_topogen(topo=None):
global_tgen.topo = topo
return global_tgen
+
def set_topogen(tgen):
"Helper function to set Topogen"
# pylint: disable=W0603
global global_tgen
global_tgen = tgen
+
#
# Main class: topology builder
#
# Topogen configuration defaults
tgen_defaults = {
- 'verbosity': 'info',
- 'frrdir': '/usr/lib/frr',
- 'quaggadir': '/usr/lib/quagga',
- 'routertype': 'frr',
- 'memleak_path': None,
+ "verbosity": "info",
+ "frrdir": "/usr/lib/frr",
+ "quaggadir": "/usr/lib/quagga",
+ "routertype": "frr",
+ "memleak_path": None,
}
+
class Topogen(object):
"A topology test builder helper."
- CONFIG_SECTION = 'topogen'
+ CONFIG_SECTION = "topogen"
- def __init__(self, cls, modname='unnamed'):
+ def __init__(self, cls, modname="unnamed"):
"""
Topogen initialization function, takes the following arguments:
* `cls`: the topology class that is child of mininet.topo
@@ -117,16 +121,16 @@ class Topogen(object):
self.switchn = 1
self.modname = modname
self.errorsd = {}
- self.errors = ''
+ self.errors = ""
self.peern = 1
self._init_topo(cls)
- logger.info('loading topology: {}'.format(self.modname))
+ logger.info("loading topology: {}".format(self.modname))
@staticmethod
def _mininet_reset():
"Reset the mininet environment"
# Clean up the mininet environment
- os.system('mn -c > /dev/null 2>&1')
+ os.system("mn -c > /dev/null 2>&1")
def _init_topo(self, cls):
"""
@@ -138,10 +142,10 @@ class Topogen(object):
# Test for MPLS Kernel modules available
self.hasmpls = False
- if not topotest.module_present('mpls-router'):
- logger.info('MPLS tests will not run (missing mpls-router kernel module)')
- elif not topotest.module_present('mpls-iptunnel'):
- logger.info('MPLS tests will not run (missing mpls-iptunnel kernel module)')
+ if not topotest.module_present("mpls-router"):
+ logger.info("MPLS tests will not run (missing mpls-router kernel module)")
+ elif not topotest.module_present("mpls-iptunnel"):
+ logger.info("MPLS tests will not run (missing mpls-iptunnel kernel module)")
else:
self.hasmpls = True
# Load the default topology configurations
@@ -160,7 +164,7 @@ class Topogen(object):
topotests.
"""
self.config = configparser.ConfigParser(tgen_defaults)
- pytestini_path = os.path.join(CWD, '../pytest.ini')
+ pytestini_path = os.path.join(CWD, "../pytest.ini")
self.config.read(pytestini_path)
def add_router(self, name=None, cls=topotest.Router, **params):
@@ -173,15 +177,15 @@ class Topogen(object):
Returns a TopoRouter.
"""
if name is None:
- name = 'r{}'.format(self.routern)
+ name = "r{}".format(self.routern)
if name in self.gears:
- raise KeyError('router already exists')
+ raise KeyError("router already exists")
- params['frrdir'] = self.config.get(self.CONFIG_SECTION, 'frrdir')
- params['quaggadir'] = self.config.get(self.CONFIG_SECTION, 'quaggadir')
- params['memleak_path'] = self.config.get(self.CONFIG_SECTION, 'memleak_path')
- if not params.has_key('routertype'):
- params['routertype'] = self.config.get(self.CONFIG_SECTION, 'routertype')
+ params["frrdir"] = self.config.get(self.CONFIG_SECTION, "frrdir")
+ params["quaggadir"] = self.config.get(self.CONFIG_SECTION, "quaggadir")
+ params["memleak_path"] = self.config.get(self.CONFIG_SECTION, "memleak_path")
+ if not params.has_key("routertype"):
+ params["routertype"] = self.config.get(self.CONFIG_SECTION, "routertype")
self.gears[name] = TopoRouter(self, cls, name, **params)
self.routern += 1
@@ -195,9 +199,9 @@ class Topogen(object):
Returns the switch name and number.
"""
if name is None:
- name = 's{}'.format(self.switchn)
+ name = "s{}".format(self.switchn)
if name in self.gears:
- raise KeyError('switch already exists')
+ raise KeyError("switch already exists")
self.gears[name] = TopoSwitch(self, cls, name)
self.switchn += 1
@@ -211,9 +215,9 @@ class Topogen(object):
* `defaultRoute`: the peer default route (e.g. 'via 1.2.3.1')
"""
if name is None:
- name = 'peer{}'.format(self.peern)
+ name = "peer{}".format(self.peern)
if name in self.gears:
- raise KeyError('exabgp peer already exists')
+ raise KeyError("exabgp peer already exists")
self.gears[name] = TopoExaBGP(self, name, ip=ip, defaultRoute=defaultRoute)
self.peern += 1
@@ -228,9 +232,9 @@ class Topogen(object):
* TopoSwitch
"""
if not isinstance(node1, TopoGear):
- raise ValueError('invalid node1 type')
+ raise ValueError("invalid node1 type")
if not isinstance(node2, TopoGear):
- raise ValueError('invalid node2 type')
+ raise ValueError("invalid node2 type")
if ifname1 is None:
ifname1 = node1.new_link()
@@ -239,8 +243,7 @@ class Topogen(object):
node1.register_link(ifname1, node2, ifname2)
node2.register_link(ifname2, node1, ifname1)
- self.topo.addLink(node1.name, node2.name,
- intfName1=ifname1, intfName2=ifname2)
+ self.topo.addLink(node1.name, node2.name, intfName1=ifname1, intfName2=ifname2)
def get_gears(self, geartype):
"""
@@ -262,8 +265,11 @@ class Topogen(object):
# Do stuff
```
"""
- return dict((name, gear) for name, gear in self.gears.iteritems()
- if isinstance(gear, geartype))
+ return dict(
+ (name, gear)
+ for name, gear in self.gears.iteritems()
+ if isinstance(gear, geartype)
+ )
def routers(self):
"""
@@ -291,16 +297,16 @@ class Topogen(object):
"""
# If log_level is not specified use the configuration.
if log_level is None:
- log_level = self.config.get(self.CONFIG_SECTION, 'verbosity')
+ log_level = self.config.get(self.CONFIG_SECTION, "verbosity")
# Set python logger level
logger_config.set_log_level(log_level)
# Run mininet
- if log_level == 'debug':
+ if log_level == "debug":
setLogLevel(log_level)
- logger.info('starting topology: {}'.format(self.modname))
+ logger.info("starting topology: {}".format(self.modname))
self.net.start()
def start_router(self, router=None):
@@ -326,7 +332,7 @@ class Topogen(object):
first is a simple kill with no sleep, the second will sleep if not
killed and try with a different signal.
"""
- logger.info('stopping topology: {}'.format(self.modname))
+ logger.info("stopping topology: {}".format(self.modname))
errors = ""
for gear in self.gears.values():
gear.stop(False, False)
@@ -344,7 +350,8 @@ class Topogen(object):
"""
if not sys.stdin.isatty():
raise EnvironmentError(
- 'you must run pytest with \'-s\' in order to use mininet CLI')
+ "you must run pytest with '-s' in order to use mininet CLI"
+ )
CLI(self.net)
@@ -354,8 +361,9 @@ class Topogen(object):
if self.routers_have_failure():
return False
- memleak_file = (os.environ.get('TOPOTESTS_CHECK_MEMLEAK') or
- self.config.get(self.CONFIG_SECTION, 'memleak_path'))
+ memleak_file = os.environ.get("TOPOTESTS_CHECK_MEMLEAK") or self.config.get(
+ self.CONFIG_SECTION, "memleak_path"
+ )
if memleak_file is None:
return False
return True
@@ -382,7 +390,7 @@ class Topogen(object):
code = len(self.errorsd)
self.errorsd[code] = message
- self.errors += '\n{}: {}'.format(code, message)
+ self.errors += "\n{}: {}".format(code, message)
def has_errors(self):
"Returns whether errors exist or not."
@@ -393,23 +401,25 @@ class Topogen(object):
if self.has_errors():
return True
- errors = ''
+ errors = ""
router_list = self.routers().values()
for router in router_list:
result = router.check_router_running()
- if result != '':
- errors += result + '\n'
+ if result != "":
+ errors += result + "\n"
- if errors != '':
- self.set_error(errors, 'router_error')
+ if errors != "":
+ self.set_error(errors, "router_error")
assert False, errors
return True
return False
+
#
# Topology gears (equipment)
#
+
class TopoGear(object):
"Abstract class for type checking"
@@ -421,11 +431,11 @@ class TopoGear(object):
self.linkn = 0
def __str__(self):
- links = ''
+ links = ""
for myif, dest in self.links.iteritems():
_, destif = dest
- if links != '':
- links += ','
+ if links != "":
+ links += ","
links += '"{}"<->"{}"'.format(myif, destif)
return 'TopoGear<name="{}",links=[{}]>'.format(self.name, links)
@@ -462,20 +472,22 @@ class TopoGear(object):
enabled: whether we should enable or disable the interface
"""
if myif not in self.links.keys():
- raise KeyError('interface doesn\'t exists')
+ raise KeyError("interface doesn't exists")
if enabled is True:
- operation = 'up'
+ operation = "up"
else:
- operation = 'down'
+ operation = "down"
- logger.info('setting node "{}" link "{}" to state "{}"'.format(
- self.name, myif, operation
- ))
- extract=''
+ logger.info(
+ 'setting node "{}" link "{}" to state "{}"'.format(
+ self.name, myif, operation
+ )
+ )
+ extract = ""
if netns is not None:
- extract = 'ip netns exec {} '.format(netns)
- return self.run('{}ip link set dev {} {}'.format(extract, myif, operation))
+ extract = "ip netns exec {} ".format(netns)
+ return self.run("{}ip link set dev {} {}".format(extract, myif, operation))
def peer_link_enable(self, myif, enabled=True, netns=None):
"""
@@ -487,7 +499,7 @@ class TopoGear(object):
peer disables their interface our interface status changes to no link.
"""
if myif not in self.links.keys():
- raise KeyError('interface doesn\'t exists')
+ raise KeyError("interface doesn't exists")
node, nodeif = self.links[myif]
node.link_enable(nodeif, enabled, netns)
@@ -498,7 +510,7 @@ class TopoGear(object):
NOTE: This function should only be called by Topogen.
"""
- ifname = '{}-eth{}'.format(self.name, self.linkn)
+ ifname = "{}-eth{}".format(self.name, self.linkn)
self.linkn += 1
return ifname
@@ -509,10 +521,11 @@ class TopoGear(object):
NOTE: This function should only be called by Topogen.
"""
if myif in self.links.keys():
- raise KeyError('interface already exists')
+ raise KeyError("interface already exists")
self.links[myif] = (node, nodeif)
+
class TopoRouter(TopoGear):
"""
Router abstraction.
@@ -520,11 +533,11 @@ class TopoRouter(TopoGear):
# The default required directories by Quagga/FRR
PRIVATE_DIRS = [
- '/etc/frr',
- '/etc/quagga',
- '/var/run/frr',
- '/var/run/quagga',
- '/var/log'
+ "/etc/frr",
+ "/etc/quagga",
+ "/var/run/frr",
+ "/var/run/quagga",
+ "/var/log",
]
# Router Daemon enumeration definition.
@@ -543,20 +556,20 @@ class TopoRouter(TopoGear):
RD_BFD = 13
RD_SHARP = 14
RD = {
- RD_ZEBRA: 'zebra',
- RD_RIP: 'ripd',
- RD_RIPNG: 'ripngd',
- RD_OSPF: 'ospfd',
- RD_OSPF6: 'ospf6d',
- RD_ISIS: 'isisd',
- RD_BGP: 'bgpd',
- RD_PIM: 'pimd',
- RD_LDP: 'ldpd',
- RD_EIGRP: 'eigrpd',
- RD_NHRP: 'nhrpd',
- RD_STATIC: 'staticd',
- RD_BFD: 'bfdd',
- RD_SHARP: 'sharpd',
+ RD_ZEBRA: "zebra",
+ RD_RIP: "ripd",
+ RD_RIPNG: "ripngd",
+ RD_OSPF: "ospfd",
+ RD_OSPF6: "ospf6d",
+ RD_ISIS: "isisd",
+ RD_BGP: "bgpd",
+ RD_PIM: "pimd",
+ RD_LDP: "ldpd",
+ RD_EIGRP: "eigrpd",
+ RD_NHRP: "nhrpd",
+ RD_STATIC: "staticd",
+ RD_BFD: "bfdd",
+ RD_SHARP: "sharpd",
}
def __init__(self, tgen, cls, name, **params):
@@ -574,34 +587,34 @@ class TopoRouter(TopoGear):
self.name = name
self.cls = cls
self.options = {}
- self.routertype = params.get('routertype', 'frr')
- if not params.has_key('privateDirs'):
- params['privateDirs'] = self.PRIVATE_DIRS
+ self.routertype = params.get("routertype", "frr")
+ if not params.has_key("privateDirs"):
+ params["privateDirs"] = self.PRIVATE_DIRS
- self.options['memleak_path'] = params.get('memleak_path', None)
+ self.options["memleak_path"] = params.get("memleak_path", None)
# Create new log directory
- self.logdir = '/tmp/topotests/{}'.format(self.tgen.modname)
+ self.logdir = "/tmp/topotests/{}".format(self.tgen.modname)
# Clean up before starting new log files: avoids removing just created
# log files.
self._prepare_tmpfiles()
# Propagate the router log directory
- params['logdir'] = self.logdir
+ params["logdir"] = self.logdir
- #setup the per node directory
- dir = '{}/{}'.format(self.logdir, self.name)
- os.system('mkdir -p ' + dir)
- os.system('chmod -R go+rw /tmp/topotests')
+ # setup the per node directory
+ dir = "{}/{}".format(self.logdir, self.name)
+ os.system("mkdir -p " + dir)
+ os.system("chmod -R go+rw /tmp/topotests")
# Open router log file
- logfile = '{0}/{1}.log'.format(self.logdir, name)
+ logfile = "{0}/{1}.log".format(self.logdir, name)
self.logger = logger_config.get_logger(name=name, target=logfile)
self.tgen.topo.addNode(self.name, cls=self.cls, **params)
def __str__(self):
gear = super(TopoRouter, self).__str__()
- gear += ' TopoRouter<>'
+ gear += " TopoRouter<>"
return gear
def _prepare_tmpfiles(self):
@@ -622,9 +635,9 @@ class TopoRouter(TopoGear):
os.chmod(self.logdir, 0o1777)
# Try to find relevant old logfiles in /tmp and delete them
- map(os.remove, glob.glob('{}/{}/*.log'.format(self.logdir, self.name)))
+ map(os.remove, glob.glob("{}/{}/*.log".format(self.logdir, self.name)))
# Remove old core files
- map(os.remove, glob.glob('{}/{}/*.dmp'.format(self.logdir, self.name)))
+ map(os.remove, glob.glob("{}/{}/*.dmp".format(self.logdir, self.name)))
def check_capability(self, daemon, param):
"""
@@ -651,7 +664,7 @@ class TopoRouter(TopoGear):
"""
Run a series of checks and returns a status string.
"""
- self.logger.info('checking if daemons are running')
+ self.logger.info("checking if daemons are running")
return self.tgen.net[self.name].checkRouterRunning()
def start(self):
@@ -663,7 +676,7 @@ class TopoRouter(TopoGear):
* Start daemons (e.g. FRR/Quagga)
* Configure daemon logging files
"""
- self.logger.debug('starting')
+ self.logger.debug("starting")
nrouter = self.tgen.net[self.name]
result = nrouter.startRouter(self.tgen)
@@ -672,15 +685,17 @@ class TopoRouter(TopoGear):
for daemon, enabled in nrouter.daemons.iteritems():
if enabled == 0:
continue
- self.vtysh_cmd('configure terminal\nlog commands\nlog file {}.log'.format(
- daemon), daemon=daemon)
+ self.vtysh_cmd(
+ "configure terminal\nlog commands\nlog file {}.log".format(daemon),
+ daemon=daemon,
+ )
- if result != '':
+ if result != "":
self.tgen.set_error(result)
else:
# Enable MPLS processing on all interfaces.
for interface in self.links.keys():
- set_sysctl(nrouter, 'net.mpls.conf.{}.input'.format(interface), 1)
+ set_sysctl(nrouter, "net.mpls.conf.{}.input".format(interface), 1)
return result
@@ -689,7 +704,7 @@ class TopoRouter(TopoGear):
Stop router:
* Kill daemons
"""
- self.logger.debug('stopping')
+ self.logger.debug("stopping")
return self.tgen.net[self.name].stopRouter(wait, assertOnError)
def vtysh_cmd(self, command, isjson=False, daemon=None):
@@ -701,25 +716,26 @@ class TopoRouter(TopoGear):
return output for each command. See vtysh_multicmd() for more details.
"""
# Detect multi line commands
- if command.find('\n') != -1:
+ if command.find("\n") != -1:
return self.vtysh_multicmd(command, daemon=daemon)
- dparam = ''
+ dparam = ""
if daemon is not None:
- dparam += '-d {}'.format(daemon)
+ dparam += "-d {}".format(daemon)
vtysh_command = 'vtysh {} -c "{}" 2>/dev/null'.format(dparam, command)
output = self.run(vtysh_command)
- self.logger.info('\nvtysh command => {}\nvtysh output <= {}'.format(
- command, output))
+ self.logger.info(
+ "\nvtysh command => {}\nvtysh output <= {}".format(command, output)
+ )
if isjson is False:
return output
try:
return json.loads(output)
except ValueError:
- logger.warning('vtysh_cmd: failed to convert json output')
+ logger.warning("vtysh_cmd: failed to convert json output")
return {}
def vtysh_multicmd(self, commands, pretty_output=True, daemon=None):
@@ -734,21 +750,22 @@ class TopoRouter(TopoGear):
# Prepare the temporary file that will hold the commands
fname = topotest.get_file(commands)
- dparam = ''
+ dparam = ""
if daemon is not None:
- dparam += '-d {}'.format(daemon)
+ dparam += "-d {}".format(daemon)
# Run the commands and delete the temporary file
if pretty_output:
- vtysh_command = 'vtysh {} < {}'.format(dparam, fname)
+ vtysh_command = "vtysh {} < {}".format(dparam, fname)
else:
- vtysh_command = 'vtysh {} -f {}'.format(dparam, fname)
+ vtysh_command = "vtysh {} -f {}".format(dparam, fname)
res = self.run(vtysh_command)
os.unlink(fname)
- self.logger.info('\nvtysh command => "{}"\nvtysh output <= "{}"'.format(
- vtysh_command, res))
+ self.logger.info(
+ '\nvtysh command => "{}"\nvtysh output <= "{}"'.format(vtysh_command, res)
+ )
return res
@@ -760,27 +777,29 @@ class TopoRouter(TopoGear):
NOTE: to run this you must have the environment variable
TOPOTESTS_CHECK_MEMLEAK set or memleak_path configured in `pytest.ini`.
"""
- memleak_file = os.environ.get('TOPOTESTS_CHECK_MEMLEAK') or self.options['memleak_path']
+ memleak_file = (
+ os.environ.get("TOPOTESTS_CHECK_MEMLEAK") or self.options["memleak_path"]
+ )
if memleak_file is None:
return
self.stop()
- self.logger.info('running memory leak report')
+ self.logger.info("running memory leak report")
self.tgen.net[self.name].report_memory_leaks(memleak_file, testname)
def version_info(self):
"Get equipment information from 'show version'."
- output = self.vtysh_cmd('show version').split('\n')[0]
- columns = topotest.normalize_text(output).split(' ')
+ output = self.vtysh_cmd("show version").split("\n")[0]
+ columns = topotest.normalize_text(output).split(" ")
try:
return {
- 'type': columns[0],
- 'version': columns[1],
+ "type": columns[0],
+ "version": columns[1],
}
except IndexError:
return {
- 'type': None,
- 'version': None,
+ "type": None,
+ "version": None,
}
def has_version(self, cmpop, version):
@@ -802,19 +821,21 @@ class TopoRouter(TopoGear):
Compares router type with `rtype`. Returns `True` if the type matches,
otherwise `false`.
"""
- curtype = self.version_info()['type']
+ curtype = self.version_info()["type"]
return rtype == curtype
def has_mpls(self):
nrouter = self.tgen.net[self.name]
return nrouter.hasmpls
+
class TopoSwitch(TopoGear):
"""
Switch abstraction. Has the following properties:
* cls: switch class that will be used to instantiate
* name: switch name
"""
+
# pylint: disable=too-few-public-methods
def __init__(self, tgen, cls, name):
@@ -827,9 +848,10 @@ class TopoSwitch(TopoGear):
def __str__(self):
gear = super(TopoSwitch, self).__str__()
- gear += ' TopoSwitch<>'
+ gear += " TopoSwitch<>"
return gear
+
class TopoHost(TopoGear):
"Host abstraction."
# pylint: disable=too-few-public-methods
@@ -853,18 +875,21 @@ class TopoHost(TopoGear):
def __str__(self):
gear = super(TopoHost, self).__str__()
gear += ' TopoHost<ip="{}",defaultRoute="{}",privateDirs="{}">'.format(
- self.options['ip'], self.options['defaultRoute'],
- str(self.options['privateDirs']))
+ self.options["ip"],
+ self.options["defaultRoute"],
+ str(self.options["privateDirs"]),
+ )
return gear
+
class TopoExaBGP(TopoHost):
"ExaBGP peer abstraction."
# pylint: disable=too-few-public-methods
PRIVATE_DIRS = [
- '/etc/exabgp',
- '/var/run/exabgp',
- '/var/log',
+ "/etc/exabgp",
+ "/var/run/exabgp",
+ "/var/log",
]
def __init__(self, tgen, name, **params):
@@ -878,13 +903,13 @@ class TopoExaBGP(TopoHost):
has a privateDirs already defined and contains functions to handle ExaBGP
things.
"""
- params['privateDirs'] = self.PRIVATE_DIRS
+ params["privateDirs"] = self.PRIVATE_DIRS
super(TopoExaBGP, self).__init__(tgen, name, **params)
self.tgen.topo.addHost(name, **params)
def __str__(self):
gear = super(TopoExaBGP, self).__str__()
- gear += ' TopoExaBGP<>'.format()
+ gear += " TopoExaBGP<>".format()
return gear
def start(self, peer_dir, env_file=None):
@@ -895,22 +920,22 @@ class TopoExaBGP(TopoHost):
* Make all python files runnable
* Run ExaBGP with env file `env_file` and configuration peer*/exabgp.cfg
"""
- self.run('mkdir /etc/exabgp')
- self.run('chmod 755 /etc/exabgp')
- self.run('cp {}/* /etc/exabgp/'.format(peer_dir))
+ self.run("mkdir /etc/exabgp")
+ self.run("chmod 755 /etc/exabgp")
+ self.run("cp {}/* /etc/exabgp/".format(peer_dir))
if env_file is not None:
- self.run('cp {} /etc/exabgp/exabgp.env'.format(env_file))
- self.run('chmod 644 /etc/exabgp/*')
- self.run('chmod a+x /etc/exabgp/*.py')
- self.run('chown -R exabgp:exabgp /etc/exabgp')
- output = self.run('exabgp -e /etc/exabgp/exabgp.env /etc/exabgp/exabgp.cfg')
+ self.run("cp {} /etc/exabgp/exabgp.env".format(env_file))
+ self.run("chmod 644 /etc/exabgp/*")
+ self.run("chmod a+x /etc/exabgp/*.py")
+ self.run("chown -R exabgp:exabgp /etc/exabgp")
+ output = self.run("exabgp -e /etc/exabgp/exabgp.env /etc/exabgp/exabgp.cfg")
if output == None or len(output) == 0:
- output = '<none>'
- logger.info('{} exabgp started, output={}'.format(self.name, output))
+ output = "<none>"
+ logger.info("{} exabgp started, output={}".format(self.name, output))
def stop(self, wait=True, assertOnError=True):
"Stop ExaBGP peer and kill the daemon"
- self.run('kill `cat /var/run/exabgp/exabgp.pid`')
+ self.run("kill `cat /var/run/exabgp/exabgp.pid`")
return ""
@@ -928,160 +953,189 @@ def diagnose_env_linux():
ret = True
# Test log path exists before installing handler.
- if not os.path.isdir('/tmp'):
- logger.warning('could not find /tmp for logs')
+ if not os.path.isdir("/tmp"):
+ logger.warning("could not find /tmp for logs")
else:
- os.system('mkdir /tmp/topotests')
+ os.system("mkdir /tmp/topotests")
# Log diagnostics to file so it can be examined later.
- fhandler = logging.FileHandler(filename='/tmp/topotests/diagnostics.txt')
+ fhandler = logging.FileHandler(filename="/tmp/topotests/diagnostics.txt")
fhandler.setLevel(logging.DEBUG)
fhandler.setFormatter(
- logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s')
+ logging.Formatter(fmt="%(asctime)s %(levelname)s: %(message)s")
)
logger.addHandler(fhandler)
- logger.info('Running environment diagnostics')
+ logger.info("Running environment diagnostics")
# Load configuration
config = configparser.ConfigParser(tgen_defaults)
- pytestini_path = os.path.join(CWD, '../pytest.ini')
+ pytestini_path = os.path.join(CWD, "../pytest.ini")
config.read(pytestini_path)
# Assert that we are running as root
if os.getuid() != 0:
- logger.error('you must run topotest as root')
+ logger.error("you must run topotest as root")
ret = False
# Assert that we have mininet
- if os.system('which mn >/dev/null 2>/dev/null') != 0:
- logger.error('could not find mininet binary (mininet is not installed)')
+ if os.system("which mn >/dev/null 2>/dev/null") != 0:
+ logger.error("could not find mininet binary (mininet is not installed)")
ret = False
# Assert that we have iproute installed
- if os.system('which ip >/dev/null 2>/dev/null') != 0:
- logger.error('could not find ip binary (iproute is not installed)')
+ if os.system("which ip >/dev/null 2>/dev/null") != 0:
+ logger.error("could not find ip binary (iproute is not installed)")
ret = False
# Assert that we have gdb installed
- if os.system('which gdb >/dev/null 2>/dev/null') != 0:
- logger.error('could not find gdb binary (gdb is not installed)')
+ if os.system("which gdb >/dev/null 2>/dev/null") != 0:
+ logger.error("could not find gdb binary (gdb is not installed)")
ret = False
# Assert that FRR utilities exist
- frrdir = config.get('topogen', 'frrdir')
+ frrdir = config.get("topogen", "frrdir")
hasfrr = False
if not os.path.isdir(frrdir):
- logger.error('could not find {} directory'.format(frrdir))
+ logger.error("could not find {} directory".format(frrdir))
ret = False
else:
hasfrr = True
try:
- pwd.getpwnam('frr')[2]
+ pwd.getpwnam("frr")[2]
except KeyError:
logger.warning('could not find "frr" user')
try:
- grp.getgrnam('frr')[2]
+ grp.getgrnam("frr")[2]
except KeyError:
logger.warning('could not find "frr" group')
try:
- if 'frr' not in grp.getgrnam('frrvty').gr_mem:
- logger.error('"frr" user and group exist, but user is not under "frrvty"')
+ if "frr" not in grp.getgrnam("frrvty").gr_mem:
+ logger.error(
+ '"frr" user and group exist, but user is not under "frrvty"'
+ )
except KeyError:
logger.warning('could not find "frrvty" group')
- for fname in ['zebra', 'ospfd', 'ospf6d', 'bgpd', 'ripd', 'ripngd',
- 'isisd', 'pimd', 'ldpd']:
+ for fname in [
+ "zebra",
+ "ospfd",
+ "ospf6d",
+ "bgpd",
+ "ripd",
+ "ripngd",
+ "isisd",
+ "pimd",
+ "ldpd",
+ ]:
path = os.path.join(frrdir, fname)
if not os.path.isfile(path):
# LDPd is an exception
- if fname == 'ldpd':
- logger.info('could not find {} in {}'.format(fname, frrdir) +
- '(LDPd tests will not run)')
+ if fname == "ldpd":
+ logger.info(
+ "could not find {} in {}".format(fname, frrdir)
+ + "(LDPd tests will not run)"
+ )
continue
- logger.warning('could not find {} in {}'.format(fname, frrdir))
+ logger.warning("could not find {} in {}".format(fname, frrdir))
ret = False
else:
- if fname != 'zebra':
+ if fname != "zebra":
continue
- os.system(
- '{} -v 2>&1 >/tmp/topotests/frr_zebra.txt'.format(path)
- )
+ os.system("{} -v 2>&1 >/tmp/topotests/frr_zebra.txt".format(path))
# Assert that Quagga utilities exist
- quaggadir = config.get('topogen', 'quaggadir')
+ quaggadir = config.get("topogen", "quaggadir")
if hasfrr:
# if we have frr, don't check for quagga
pass
elif not os.path.isdir(quaggadir):
- logger.info('could not find {} directory (quagga tests will not run)'.format(quaggadir))
+ logger.info(
+ "could not find {} directory (quagga tests will not run)".format(quaggadir)
+ )
else:
ret = True
try:
- pwd.getpwnam('quagga')[2]
+ pwd.getpwnam("quagga")[2]
except KeyError:
logger.info('could not find "quagga" user')
try:
- grp.getgrnam('quagga')[2]
+ grp.getgrnam("quagga")[2]
except KeyError:
logger.info('could not find "quagga" group')
try:
- if 'quagga' not in grp.getgrnam('quaggavty').gr_mem:
- logger.error('"quagga" user and group exist, but user is not under "quaggavty"')
+ if "quagga" not in grp.getgrnam("quaggavty").gr_mem:
+ logger.error(
+ '"quagga" user and group exist, but user is not under "quaggavty"'
+ )
except KeyError:
logger.warning('could not find "quaggavty" group')
- for fname in ['zebra', 'ospfd', 'ospf6d', 'bgpd', 'ripd', 'ripngd',
- 'isisd', 'pimd']:
+ for fname in [
+ "zebra",
+ "ospfd",
+ "ospf6d",
+ "bgpd",
+ "ripd",
+ "ripngd",
+ "isisd",
+ "pimd",
+ ]:
path = os.path.join(quaggadir, fname)
if not os.path.isfile(path):
- logger.warning('could not find {} in {}'.format(fname, quaggadir))
+ logger.warning("could not find {} in {}".format(fname, quaggadir))
ret = False
else:
- if fname != 'zebra':
+ if fname != "zebra":
continue
- os.system(
- '{} -v 2>&1 >/tmp/topotests/quagga_zebra.txt'.format(path)
- )
+ os.system("{} -v 2>&1 >/tmp/topotests/quagga_zebra.txt".format(path))
# Test MPLS availability
krel = platform.release()
- if topotest.version_cmp(krel, '4.5') < 0:
- logger.info('LDPd tests will not run (have kernel "{}", but it requires 4.5)'.format(krel))
+ if topotest.version_cmp(krel, "4.5") < 0:
+ logger.info(
+ 'LDPd tests will not run (have kernel "{}", but it requires 4.5)'.format(
+ krel
+ )
+ )
# Test for MPLS Kernel modules available
- if not topotest.module_present('mpls-router', load=False) != 0:
- logger.info('LDPd tests will not run (missing mpls-router kernel module)')
- if not topotest.module_present('mpls-iptunnel', load=False) != 0:
- logger.info('LDPd tests will not run (missing mpls-iptunnel kernel module)')
+ if not topotest.module_present("mpls-router", load=False) != 0:
+ logger.info("LDPd tests will not run (missing mpls-router kernel module)")
+ if not topotest.module_present("mpls-iptunnel", load=False) != 0:
+ logger.info("LDPd tests will not run (missing mpls-iptunnel kernel module)")
# TODO remove me when we start supporting exabgp >= 4
try:
- output = subprocess.check_output(['exabgp', '-v'])
- line = output.split('\n')[0]
- version = line.split(' ')[2]
- if topotest.version_cmp(version, '4') >= 0:
- logger.warning('BGP topologies are still using exabgp version 3, expect failures')
+ output = subprocess.check_output(["exabgp", "-v"])
+ line = output.split("\n")[0]
+ version = line.split(" ")[2]
+ if topotest.version_cmp(version, "4") >= 0:
+ logger.warning(
+ "BGP topologies are still using exabgp version 3, expect failures"
+ )
# We want to catch all exceptions
# pylint: disable=W0702
except:
- logger.warning('failed to find exabgp or returned error')
+ logger.warning("failed to find exabgp or returned error")
# After we logged the output to file, remove the handler.
logger.removeHandler(fhandler)
return ret
+
def diagnose_env_freebsd():
return True
+
def diagnose_env():
if sys.platform.startswith("linux"):
return diagnose_env_linux()