summaryrefslogtreecommitdiff
path: root/tests/topotests/bgp_features/test_bgp_features.py
blob: a27aaf9ec72cef01a34a6a11b809d0adc330deed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#!/usr/bin/env python

#
# test_bgp_features.py
# Part of NetDEF Topology Tests
#
# Copyright (c) 2020 by
# Network Device Education Foundation, Inc. ("NetDEF")
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#

"""
test_bgp_features.py: Test various BGP features.
"""

import json
import functools
import os
import sys
import pytest
import re

# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))

# pylint: disable=C0413
# Import topogen and topotest helpers
from lib import topotest
from lib.topogen import Topogen, TopoRouter, get_topogen
from lib.topolog import logger

# Required to instantiate the topology builder class.
from mininet.topo import Topo

#####################################################
#
#   Network Topology Definition
#
#####################################################


class BGPFeaturesTopo1(Topo):
    "BGP Features Topology 1"

    def build(self, **_opts):
        tgen = get_topogen(self)

        # Create the routers
        for rtrNum in range(1, 6):
            tgen.add_router("r{}".format(rtrNum))

        # Setup Switches and connections
        for swNum in range(1, 11):
            tgen.add_switch("sw{}".format(swNum))

        # Add connections to stub switches
        tgen.gears["r1"].add_link(tgen.gears["sw6"])
        tgen.gears["r2"].add_link(tgen.gears["sw7"])
        tgen.gears["r3"].add_link(tgen.gears["sw8"])
        tgen.gears["r4"].add_link(tgen.gears["sw9"])
        tgen.gears["r5"].add_link(tgen.gears["sw10"])

        # Add connections to R1-R2-R3 core
        tgen.gears["r1"].add_link(tgen.gears["sw1"])
        tgen.gears["r1"].add_link(tgen.gears["sw3"])
        tgen.gears["r2"].add_link(tgen.gears["sw1"])
        tgen.gears["r2"].add_link(tgen.gears["sw2"])
        tgen.gears["r3"].add_link(tgen.gears["sw2"])
        tgen.gears["r3"].add_link(tgen.gears["sw3"])

        # Add connections to external R4/R5 Routers
        tgen.gears["r1"].add_link(tgen.gears["sw4"])
        tgen.gears["r4"].add_link(tgen.gears["sw4"])
        tgen.gears["r2"].add_link(tgen.gears["sw5"])
        tgen.gears["r5"].add_link(tgen.gears["sw5"])


#####################################################
#
#   Tests starting
#
#####################################################


def setup_module(module):
    tgen = Topogen(BGPFeaturesTopo1, module.__name__)
    tgen.start_topology()

    # Starting Routers
    router_list = tgen.routers()
    for rname, router in router_list.iteritems():
        router.load_config(
            TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
        )
        if os.path.exists(os.path.join(CWD, "{}/bgpd.conf".format(rname))):
            logger.info("{} uses BGPd".format(rname))
            router.load_config(
                TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format(rname))
            )
        if os.path.exists(os.path.join(CWD, "{}/ospfd.conf".format(rname))):
            logger.info("{} uses OSPFd".format(rname))
            router.load_config(
                TopoRouter.RD_OSPF, os.path.join(CWD, "{}/ospfd.conf".format(rname))
            )
        if os.path.exists(os.path.join(CWD, "{}/ospf6d.conf".format(rname))):
            logger.info("{} uses OSPF6d".format(rname))
            router.load_config(
                TopoRouter.RD_OSPF6, os.path.join(CWD, "{}/ospf6d.conf".format(rname))
            )
        router.start()


def teardown_module(module):
    tgen = get_topogen()
    tgen.stop_topology()


def test_ospf_convergence():
    "Test for OSPFv2 topology convergence"
    tgen = get_topogen()

    # Skip if previous fatal error condition is raised
    if tgen.routers_have_failure():
        pytest.skip(tgen.errors)

    # Check Router r1, r2 & r3 OSPF
    for rtrNum in range(1, 4):
        logger.info("Checking OSPFv2 convergence on router r{}".format(rtrNum))

        router = tgen.gears["r{}".format(rtrNum)]
        reffile = os.path.join(CWD, "r{}/ospf_neighbor.json".format(rtrNum))
        expected = json.loads(open(reffile).read())

        test_func = functools.partial(
            topotest.router_json_cmp, router, "show ip ospf neighbor json", expected
        )
        _, res = topotest.run_and_expect(test_func, None, count=60, wait=2)
        assertmsg = "OSPF router R{} did not converge".format(rtrNum)
        assert res is None, assertmsg


def test_bgp_convergence():
    "Test for BGP topology convergence"
    tgen = get_topogen()

    # Skip if previous fatal error condition is raised
    if tgen.routers_have_failure():
        pytest.skip(tgen.errors)

    # Check Router r1 & r2 BGP
    for rtrNum in [1, 2, 4, 5]:
        logger.info("Checking BGP IPv4 convergence on router r{}".format(rtrNum))

        router = tgen.gears["r{}".format(rtrNum)]
        reffile = os.path.join(CWD, "r{}/bgp_summary.json".format(rtrNum))
        expected = json.loads(open(reffile).read())

        test_func = functools.partial(
            topotest.router_json_cmp, router, "show ip bgp summary json", expected
        )
        _, res = topotest.run_and_expect(test_func, None, count=60, wait=2)
        assertmsg = "BGP router R{} did not converge".format(rtrNum)
        assert res is None, assertmsg

    # tgen.mininet_cli()


def test_bgp_shutdown():
    "Test BGP instance shutdown"

    tgen = get_topogen()

    # Skip if previous fatal error condition is raised
    if tgen.routers_have_failure():
        pytest.skip(tgen.errors)

    tgen.net['r1'].cmd('vtysh -c \"conf t\" -c \"router bgp 65000\" -c \"bgp shutdown message ABCDabcd\"')

    # Check BGP Summary on local and remote routers
    for rtrNum in [1, 2, 4]:
        logger.info("Checking BGP Summary after shutdown of R1 BGP on router r{}".format(rtrNum))

        router = tgen.gears["r{}".format(rtrNum)]
        reffile = os.path.join(CWD, "r{}/bgp_shutdown_summary.json".format(rtrNum))
        expected = json.loads(open(reffile).read())

        test_func = functools.partial(
            topotest.router_json_cmp, router, "show ip bgp summary json", expected
        )
        _, res = topotest.run_and_expect(test_func, None, count=60, wait=2)
        assertmsg = "BGP sessions on router R{} are in incorrect state (not down as expected?)".format(rtrNum)
        assert res is None, assertmsg


def test_bgp_shutdown_message():
    "Test BGP Peer Shutdown Message"

    tgen = get_topogen()

    # Skip if previous fatal error condition is raised
    if tgen.routers_have_failure():
        pytest.skip(tgen.errors)

    for rtrNum in [2, 4]:
        logger.info("Checking BGP shutdown received on router r{}".format(rtrNum))

        shut_message = tgen.net['r{}'.format(rtrNum)].cmd(
            'tail bgpd.log | grep "NOTIFICATION.*Cease/Administratively Shutdown"')
        assertmsg = "BGP shutdown message not received on router R{}".format(rtrNum)
        assert shut_message != '', assertmsg

        m = re.search('.*([0-9]+ bytes[ 0-9a-fA-F]+)', shut_message)
        if m:
            found = m.group(1)
        else:
            found = ''
        assertmsg = "Incorrect BGP shutdown message received on router R{}".format(rtrNum)
        assert found == '8 bytes 41 42 43 44 61 62 63 64', assertmsg

    # tgen.mininet_cli()


def test_bgp_no_shutdown():
    "Test BGP instance no shutdown"

    tgen = get_topogen()

    # Skip if previous fatal error condition is raised
    if tgen.routers_have_failure():
        pytest.skip(tgen.errors)

    tgen.net['r1'].cmd('vtysh -c \"conf t\" -c \"router bgp 65000\" -c \"no bgp shutdown\"')

    # Check BGP Summary on local and remote routers
    for rtrNum in [1, 2, 4]:
        logger.info("Checking BGP Summary after removing bgp shutdown on router r1 on router r{}".format(rtrNum))

        router = tgen.gears["r{}".format(rtrNum)]
        reffile = os.path.join(CWD, "r{}/bgp_summary.json".format(rtrNum))
        expected = json.loads(open(reffile).read())

        test_func = functools.partial(
            topotest.router_json_cmp, router, "show ip bgp summary json", expected
        )
        _, res = topotest.run_and_expect(test_func, None, count=60, wait=2)
        assertmsg = "BGP sessions on router R{} are in incorrect state (not down as expected?)".format(rtrNum)
        assert res is None, assertmsg



if __name__ == "__main__":
    args = ["-s"] + sys.argv[1:]
    sys.exit(pytest.main(args))