--- /dev/null
+#!/usr/bin/env python
+
+#
+# test_all_protocol_startup.py
+# Part of NetDEF Topology Tests
+#
+# Copyright (c) 2017 by
+# Network Device Education Foundation, Inc. ("NetDEF")
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+test_all_protocol_startup.py: Test of all protocols at same time
+
+"""
+
+import os
+import re
+import sys
+import difflib
+import pytest
+from time import sleep
+
+from mininet.topo import Topo
+from mininet.net import Mininet
+from mininet.node import Node, OVSSwitch, Host
+from mininet.log import setLogLevel, info
+from mininet.cli import CLI
+from mininet.link import Intf
+
+from functools import partial
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+from lib import topotest
+
+fatal_error = ""
+
+
+#####################################################
+##
+## Network Topology Definition
+##
+#####################################################
+
+class NetworkTopo(Topo):
+ "All Protocol Startup Test"
+
+ def build(self, **_opts):
+
+ # Setup Routers
+ router = {}
+ #
+ # Setup Main Router
+ router[1] = topotest.addRouter(self, 'r1')
+ #
+
+ # Setup Switches
+ switch = {}
+ #
+ for i in range(0, 10):
+ switch[i] = self.addSwitch('sw%s' % i, cls=topotest.LegacySwitch)
+ self.addLink(switch[i], router[1], intfName2='r1-eth%s' % i )
+
+
+#####################################################
+##
+## Tests starting
+##
+#####################################################
+
+def setup_module(module):
+ global topo, net
+ global fatal_error
+
+ print("\n\n** %s: Setup Topology" % module.__name__)
+ print("******************************************\n")
+
+ print("Cleanup old Mininet runs")
+ os.system('sudo mn -c > /dev/null 2>&1')
+ os.system('sudo rm /tmp/r* > /dev/null 2>&1')
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+ topo = NetworkTopo()
+
+ net = Mininet(controller=None, topo=topo)
+ net.start()
+
+ if net['r1'].get_routertype() != 'frr':
+ fatal_error = "Test is only implemented for FRR"
+ sys.stderr.write('\n\nTest is only implemented for FRR - Skipping\n\n')
+ pytest.skip(fatal_error)
+
+ # Starting Routers
+ #
+ # Main router
+ for i in range(1, 2):
+ net['r%s' % i].loadConf('zebra', '%s/r%s/zebra.conf' % (thisDir, i))
+ net['r%s' % i].loadConf('ripd', '%s/r%s/ripd.conf' % (thisDir, i))
+ net['r%s' % i].loadConf('ripngd', '%s/r%s/ripngd.conf' % (thisDir, i))
+ net['r%s' % i].loadConf('ospfd', '%s/r%s/ospfd.conf' % (thisDir, i))
+ net['r%s' % i].loadConf('ospf6d', '%s/r%s/ospf6d.conf' % (thisDir, i))
+ net['r%s' % i].loadConf('isisd', '%s/r%s/isisd.conf' % (thisDir, i))
+ net['r%s' % i].loadConf('bgpd', '%s/r%s/bgpd.conf' % (thisDir, i))
+ if net['r%s' % i].daemon_available('ldpd'):
+ # Only test LDPd if it's installed and Kernel >= 4.5
+ net['r%s' % i].loadConf('ldpd', '%s/r%s/ldpd.conf' % (thisDir, i))
+ net['r%s' % i].startRouter()
+
+ # For debugging after starting Quagga/FRR daemons, uncomment the next line
+ # CLI(net)
+
+
+def teardown_module(module):
+ global net
+
+ print("\n\n** %s: Shutdown Topology" % module.__name__)
+ print("******************************************\n")
+
+ # End - Shutdown network
+ net.stop()
+
+
+def test_router_running():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ print("\n\n** Check if FRR/Quagga is running on each Router node")
+ print("******************************************\n")
+ sleep(5)
+
+ # Starting Routers
+ for i in range(1, 2):
+ fatal_error = net['r%s' % i].checkRouterRunning()
+ assert fatal_error == "", fatal_error
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_error_messages_vtysh():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ print("\n\n** Check for error messages on VTYSH")
+ print("******************************************\n")
+
+ failures = 0
+ for i in range(1, 2):
+ #
+ # First checking Standard Output
+ #
+
+ # VTYSH output from router
+ vtystdout = net['r%s' % i].cmd('vtysh -c "show version" 2> /dev/null').rstrip()
+
+ # Fix newlines (make them all the same)
+ vtystdout = ('\n'.join(vtystdout.splitlines()) + '\n').rstrip()
+ # Drop everything starting with "FRRouting X.xx" message
+ vtystdout = re.sub(r"FRRouting [0-9]+.*", "", vtystdout, flags=re.DOTALL)
+
+ if (vtystdout != ''):
+ sys.stderr.write('\nr%s created some spurious VTYSH start StdOut messages:\n%s\n' % (i, vtystdout))
+ failures += 1
+ else:
+ print("r%s StdOut ok" % i)
+
+ #
+ # Second checking Standard Error
+ #
+
+ # VTYSH StdErr output from router
+ vtystderr = net['r%s' % i].cmd('vtysh -c "show version" > /dev/null').rstrip()
+
+ # Fix newlines (make them all the same)
+ vtystderr = ('\n'.join(vtystderr.splitlines()) + '\n').rstrip()
+ # # Drop everything starting with "FRRouting X.xx" message
+ # vtystderr = re.sub(r"FRRouting [0-9]+.*", "", vtystderr, flags=re.DOTALL)
+
+ if (vtystderr != ''):
+ sys.stderr.write('\nr%s created some spurious VTYSH start StdErr messages:\n<%s>\n' % (i, vtystderr))
+ failures += 1
+ else:
+ print("r%s StdErr ok" % i)
+
+ assert failures == 0, "IP RIP status failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_error_messages_daemons():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ print("\n\n** Check for error messages in daemons")
+ print("******************************************\n")
+
+ error_logs = ""
+
+ for i in range(1, 2):
+ log = net['r%s' % i].getStdErr('ripd')
+ if log:
+ error_logs += "r%s RIPd StdErr Output:\n" % i
+ error_logs += log
+ log = net['r%s' % i].getStdErr('ripngd')
+ if log:
+ error_logs += "r%s RIPngd StdErr Output:\n" % i
+ error_logs += log
+ log = net['r%s' % i].getStdErr('ospfd')
+ if log:
+ error_logs += "r%s OSPFd StdErr Output:\n" % i
+ error_logs += log
+ log = net['r%s' % i].getStdErr('ospf6d')
+ if log:
+ error_logs += "r%s OSPF6d StdErr Output:\n" % i
+ error_logs += log
+ log = net['r%s' % i].getStdErr('isisd')
+ # ISIS shows debugging enabled status on StdErr
+ # Remove these messages
+ log = re.sub(r"^IS-IS .* debugging is on.*", "", log).rstrip()
+ if log:
+ error_logs += "r%s ISISd StdErr Output:\n" % i
+ error_logs += log
+ log = net['r%s' % i].getStdErr('bgpd')
+ if log:
+ error_logs += "r%s BGPd StdErr Output:\n" % i
+ error_logs += log
+ if (net['r%s' % i].daemon_available('ldpd')):
+ log = net['r%s' % i].getStdErr('ldpd')
+ if log:
+ error_logs += "r%s LDPd StdErr Output:\n" % i
+ error_logs += log
+ log = net['r%s' % i].getStdErr('zebra')
+ if log:
+ error_logs += "r%s Zebra StdErr Output:\n"
+ error_logs += log
+
+ if error_logs:
+ sys.stderr.write('Failed check for StdErr Output on daemons:\n%s\n' % error_logs)
+
+ assert error_logs == "", "Daemons report errors to StdErr"
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_converge_protocols():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ print("\n\n** Waiting for protocols convergence")
+ print("******************************************\n")
+
+ # Not really implemented yet - just sleep 60 secs for now
+ sleep(60)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ ## CLI(net)
+
+
+def test_rip_status():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing RIP status")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/rip_status.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show ip rip status" 2> /dev/null').rstrip()
+ # Drop time in next due
+ actual = re.sub(r"in [0-9]+ seconds", "in XX seconds", actual)
+ # Drop time in last update
+ actual = re.sub(r" [0-2][0-9]:[0-5][0-9]:[0-5][0-9]", " XX:XX:XX", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual IP RIP status",
+ tofile="expected IP RIP status"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed IP RIP status check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "IP RIP status failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_ripng_status():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing RIPng status")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/ripng_status.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show ipv6 ripng status" 2> /dev/null').rstrip()
+ # Mask out Link-Local mac address portion. They are random...
+ actual = re.sub(r" fe80::[0-9a-f:]+", " fe80::XXXX:XXXX:XXXX:XXXX", actual)
+ # Drop time in next due
+ actual = re.sub(r"in [0-9]+ seconds", "in XX seconds", actual)
+ # Drop time in last update
+ actual = re.sub(r" [0-2][0-9]:[0-5][0-9]:[0-5][0-9]", " XX:XX:XX", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual IPv6 RIPng status",
+ tofile="expected IPv6 RIPng status"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed IPv6 RIPng status check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "IPv6 RIPng status failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_ospfv2_interfaces():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing OSPFv2 interfaces")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_ip_ospf_interface.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show ip ospf interface" 2> /dev/null').rstrip()
+ # Mask out Bandwidth portion. They may change..
+ actual = re.sub(r"BW [0-9]+ Mbit", "BW XX Mbit", actual)
+ # Drop time in next due
+ actual = re.sub(r"Hello due in [0-9\.]+s", "Hello due in XX.XXXs", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual SHOW IP OSPF INTERFACE",
+ tofile="expected SHOW IP OSPF INTERFACE"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed SHOW IP OSPF INTERFACE check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "SHOW IP OSPF INTERFACE failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_isis_interfaces():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing ISIS interfaces")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_isis_interface_detail.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show isis interface detail" 2> /dev/null').rstrip()
+ # Mask out Link-Local mac address portion. They are random...
+ actual = re.sub(r"fe80::[0-9a-f:]+", "fe80::XXXX:XXXX:XXXX:XXXX", actual)
+ # Mask out SNPA mac address portion. They are random...
+ actual = re.sub(r"SNPA: [0-9a-f\.]+", "SNPA: XXXX.XXXX.XXXX", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual SHOW ISIS INTERFACE DETAIL",
+ tofile="expected SHOW ISIS OSPF6 INTERFACE DETAIL"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed SHOW ISIS INTERFACE DETAIL check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "SHOW ISIS INTERFACE DETAIL failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_bgp_summary():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing BGP Summary")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_bgp_summary.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show bgp summary" 2> /dev/null').rstrip()
+ # Mask out "using XXiXX bytes" portion. They are random...
+ actual = re.sub(r"using [0-9]+ bytes", "using XXXX bytes", actual)
+ # Mask out "using XiXXX KiB" portion. They are random...
+ actual = re.sub(r"using [0-9]+ KiB", "using XXXX KiB", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual SHOW BGP SUMMARY",
+ tofile="expected SHOW BGP SUMMARY"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed SHOW BGP SUMMARY check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "SHOW SHOW BGP SUMMARY failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_bgp_ipv6_summary():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing BGP IPv6 Summary")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_bgp_ipv6_summary.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv6 summary" 2> /dev/null').rstrip()
+ # Mask out "using XXiXX bytes" portion. They are random...
+ actual = re.sub(r"using [0-9]+ bytes", "using XXXX bytes", actual)
+ # Mask out "using XiXXX KiB" portion. They are random...
+ actual = re.sub(r"using [0-9]+ KiB", "using XXXX KiB", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual SHOW BGP IPv6 SUMMARY",
+ tofile="expected SHOW BGP IPv6 SUMMARY"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed SHOW BGP IPv6 SUMMARY check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "SHOW BGP IPv6 SUMMARY failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_bgp_ipv4():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing BGP IPv4")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_bgp_ipv4.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv4" 2> /dev/null').rstrip()
+ # Remove summary line (changed recently)
+ actual = re.sub(r'Total number.*', '', actual)
+ actual = re.sub(r'Displayed.*', '', actual)
+ actual = actual.rstrip()
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual SHOW BGP IPv4",
+ tofile="expected SHOW BGP IPv4"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed SHOW BGP IPv4 check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "SHOW BGP IPv4 failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_bgp_ipv6():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify RIP Status
+ print("\n\n** Verifing BGP IPv6")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_bgp_ipv6.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show bgp ipv6" 2> /dev/null').rstrip()
+ # Remove summary line (changed recently)
+ actual = re.sub(r'Total number.*', '', actual)
+ actual = re.sub(r'Displayed.*', '', actual)
+ actual = actual.rstrip()
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual SHOW BGP IPv6",
+ tofile="expected SHOW BGP IPv6"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed SHOW BGP IPv6 check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ assert failures == 0, "SHOW BGP IPv6 failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+
+def test_mpls_interfaces():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ # Skip if no LDP installed or old kernel
+ if (net['r1'].daemon_available('ldpd') == False):
+ pytest.skip("No MPLS or kernel < 4.5")
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ # Verify OSPFv3 Routing Table
+ print("\n\n** Verifing MPLS Interfaces")
+ print("******************************************\n")
+ failures = 0
+ for i in range(1, 2):
+ refTableFile = '%s/r%s/show_mpls_ldp_interface.ref' % (thisDir, i)
+ if os.path.isfile(refTableFile):
+ # Read expected result from file
+ expected = open(refTableFile).read().rstrip()
+ # Fix newlines (make them all the same)
+ expected = ('\n'.join(expected.splitlines()) + '\n').splitlines(1)
+
+ # Actual output from router
+ actual = net['r%s' % i].cmd('vtysh -c "show mpls ldp interface" 2> /dev/null').rstrip()
+ # Mask out Timer in Uptime
+ actual = re.sub(r" [0-9][0-9]:[0-9][0-9]:[0-9][0-9] ", " xx:xx:xx ", actual)
+ # Fix newlines (make them all the same)
+ actual = ('\n'.join(actual.splitlines()) + '\n').splitlines(1)
+
+ # Generate Diff
+ diff = ''.join(difflib.context_diff(actual, expected,
+ fromfile="actual MPLS LDP interface status",
+ tofile="expected MPLS LDP interface status"))
+
+ # Empty string if it matches, otherwise diff contains unified diff
+ if diff:
+ sys.stderr.write('r%s failed MPLS LDP Interface status Check:\n%s\n' % (i, diff))
+ failures += 1
+ else:
+ print("r%s ok" % i)
+
+ if failures>0:
+ fatal_error = "MPLS LDP Interface status failed"
+
+ assert failures == 0, "MPLS LDP Interface status failed for router r%s:\n%s" % (i, diff)
+
+ # For debugging after starting FRR/Quagga daemons, uncomment the next line
+ # CLI(net)
+
+
+def test_shutdown_check_stderr():
+ global fatal_error
+ global net
+
+ # Skip if previous fatal error condition is raised
+ if (fatal_error != ""):
+ pytest.skip(fatal_error)
+
+ print("\n\n** Verifing unexpected STDERR output from daemons")
+ print("******************************************\n")
+
+ if os.environ.get('TOPOTESTS_CHECK_STDERR') is None:
+ print("SKIPPED (Disabled) - TOPOTESTS_CHECK_STDERR undefined\n")
+ pytest.skip('Skipping test for Stderr output and memory leaks')
+
+ thisDir = os.path.dirname(os.path.realpath(__file__))
+
+ net['r1'].stopRouter()
+
+ log = net['r1'].getStdErr('ripd')
+ print("\nRIPd StdErr Log:\n" + log)
+ log = net['r1'].getStdErr('ripngd')
+ print("\nRIPngd StdErr Log:\n" + log)
+ log = net['r1'].getStdErr('ospfd')
+ print("\nOSPFd StdErr Log:\n" + log)
+ log = net['r1'].getStdErr('ospf6d')
+ print("\nOSPF6d StdErr Log:\n" + log)
+ log = net['r1'].getStdErr('isisd')
+ print("\nISISd StdErr Log:\n" + log)
+ log = net['r1'].getStdErr('bgpd')
+ print("\nBGPd StdErr Log:\n" + log)
+ if (net['r1'].daemon_available('ldpd')):
+ log = net['r1'].getStdErr('ldpd')
+ print("\nLDPd StdErr Log:\n" + log)
+ log = net['r1'].getStdErr('zebra')
+ print("\nZebra StdErr Log:\n" + log)
+
+
+if __name__ == '__main__':
+
+ setLogLevel('info')
+ # To suppress tracebacks, either use the following pytest call or add "--tb=no" to cli
+ # retval = pytest.main(["-s", "--tb=no"])
+ retval = pytest.main(["-s"])
+ sys.exit(retval)