--- /dev/null
+#!/usr/bin/env python
+
+#
+# test_bgp_flowspec_topo.py
+# Part of NetDEF Topology Tests
+#
+# Copyright (c) 2019 by 6WIND
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+test_bgp_flowspec_topo.py: Test BGP topology with Flowspec EBGP peering
+
+
+ +------+------+
+ | peer1 |
+ | BGP peer 1 |
+ |192.168.0.161|
+ | |
+ +------+------+
+ .2 | r1-eth0
+ |
+ ~~~~~~~~~
+ +---~~ s1 ~~------+
+ ~~ ~~
+ ~~~~~~~~~
+ | 10.0.1.1 r1-eth0
+ | 1001::1 r1-eth0
+ +--------+--------+
+ | r1 |
+ |BGP 192.168.0.162|
+ | |
+ | |
+ | |
+ +-----------------+
+
+"""
+
+import json
+import functools
+import os
+import sys
+import pytest
+import getopt
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib import topotest
+from lib.topogen import Topogen, TopoRouter, get_topogen
+from lib.topolog import logger
+from lib.lutil import lUtil
+from lib.lutil import luCommand
+
+# Required to instantiate the topology builder class.
+from mininet.topo import Topo
+
+#####################################################
+##
+## Network Topology Definition
+##
+#####################################################
+
+
+class BGPFLOWSPECTopo1(Topo):
+ "BGP EBGP Flowspec Topology 1"
+
+ def build(self, **_opts):
+ tgen = get_topogen(self)
+
+ # Setup Routers
+ tgen.add_router("r1")
+
+ # Setup Control Path Switch 1. r1-eth0
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["r1"])
+
+ ## Add eBGP ExaBGP neighbors
+ peer_ip = "10.0.1.101" ## peer
+ peer_route = "via 10.0.1.1" ## router
+ peer = tgen.add_exabgp_peer("peer1", ip=peer_ip, defaultRoute=peer_route)
+ switch.add_link(peer)
+
+
+#####################################################
+##
+## Tests starting
+##
+#####################################################
+
+
+def setup_module(module):
+ tgen = Topogen(BGPFLOWSPECTopo1, module.__name__)
+
+ tgen.start_topology()
+ # check for zebra capability
+ router = tgen.gears["r1"]
+
+ # Get r1 reference and run Daemons
+ logger.info("Launching BGP and ZEBRA on r1")
+ router = tgen.gears["r1"]
+ router.load_config(
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format("r1"))
+ )
+ router.load_config(
+ TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format("r1"))
+ )
+ router.start()
+
+ peer_list = tgen.exabgp_peers()
+ for pname, peer in peer_list.iteritems():
+ peer_dir = os.path.join(CWD, pname)
+ env_file = os.path.join(CWD, "exabgp.env")
+ peer.start(peer_dir, env_file)
+ logger.info(pname)
+
+
+def teardown_module(module):
+ tgen = get_topogen()
+ tgen.stop_topology()
+
+
+def test_bgp_convergence():
+ "Test for BGP topology convergence"
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("waiting for bgp convergence")
+
+ # Expected result
+ router = tgen.gears["r1"]
+ reffile = os.path.join(CWD, "r1/summary.txt")
+
+ expected = json.loads(open(reffile).read())
+
+ test_func = functools.partial(
+ topotest.router_json_cmp, router, "show bgp summary json", expected
+ )
+ _, res = topotest.run_and_expect(test_func, None, count=90, wait=0.5)
+ assertmsg = "BGP router network did not converge"
+ assert res is None, assertmsg
+
+
+def test_bgp_flowspec():
+ tgen = get_topogen()
+
+ # Skip if previous fatal error condition is raised
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ router = tgen.gears["r1"]
+
+ logger.info("Check BGP FS entry for 3.3.3.3 with redirect IP")
+ output = router.vtysh_cmd(
+ "show bgp ipv4 flowspec 3.3.3.3", isjson=False, daemon="bgpd"
+ )
+ logger.info(output)
+ if (
+ "NH 50.0.0.2" not in output
+ or "FS:redirect IP" not in output
+ or "Packet Length < 200" not in output
+ ):
+ assertmsg = "traffic to 3.3.3.3 should have been detected as FS entry. NOK"
+ assert 0, assertmsg
+ else:
+ logger.info("Check BGP FS entry for 3.3.3.3 with redirect IP OK")
+
+ logger.info("Check BGP FS entry for 3::3 with redirect IP")
+ output = router.vtysh_cmd(
+ "show bgp ipv6 flowspec 3::3", isjson=False, daemon="bgpd"
+ )
+ logger.info(output)
+ if (
+ "NH 50::2" not in output
+ or "FS:redirect IP" not in output
+ or "Packet Length < 200" not in output
+ ):
+ assertmsg = "traffic to 3::3 should have been detected as FS entry. NOK"
+ assert 0, assertmsg
+ else:
+ logger.info("Check BGP FS entry for 3::3 with redirect IP OK")
+
+if __name__ == "__main__":
+
+ args = ["-s"] + sys.argv[1:]
+ ret = pytest.main(args)
+
+ sys.exit(ret)