summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/topolog.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/topolog.py')
-rw-r--r--tests/topotests/lib/topolog.py160
1 files changed, 78 insertions, 82 deletions
diff --git a/tests/topotests/lib/topolog.py b/tests/topotests/lib/topolog.py
index b501670789..aceb2cb031 100644
--- a/tests/topotests/lib/topolog.py
+++ b/tests/topotests/lib/topolog.py
@@ -15,13 +15,6 @@ This file defines our logging abstraction.
import logging
import os
-import subprocess
-import sys
-
-if sys.version_info[0] > 2:
- pass
-else:
- pass
try:
from xdist import is_xdist_controller
@@ -31,8 +24,6 @@ except ImportError:
return False
-BASENAME = "topolog"
-
# Helper dictionary to convert Topogen logging levels to Python's logging.
DEBUG_TOPO2LOGGING = {
"debug": logging.DEBUG,
@@ -42,13 +33,43 @@ DEBUG_TOPO2LOGGING = {
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
-FORMAT = "%(asctime)s.%(msecs)03d %(levelname)s: %(name)s: %(message)s"
+FORMAT = "%(asctime)s %(levelname)s: %(name)s: %(message)s"
handlers = {}
-logger = logging.getLogger("topolog")
+logger = logging.getLogger("topo")
+
+
+# Remove this and use munet version when we move to pytest_asyncio
+def get_test_logdir(nodeid=None, module=False):
+ """Get log directory relative pathname."""
+ xdist_worker = os.getenv("PYTEST_XDIST_WORKER", "")
+ mode = os.getenv("PYTEST_XDIST_MODE", "no")
+
+ # nodeid: all_protocol_startup/test_all_protocol_startup.py::test_router_running
+ # may be missing "::testname" if module is True
+ if not nodeid:
+ nodeid = os.environ["PYTEST_CURRENT_TEST"].split(" ")[0]
+
+ cur_test = nodeid.replace("[", "_").replace("]", "_")
+ if module:
+ idx = cur_test.rfind("::")
+ path = cur_test if idx == -1 else cur_test[:idx]
+ testname = ""
+ else:
+ path, testname = cur_test.split("::")
+ testname = testname.replace("/", ".")
+ path = path[:-3].replace("/", ".")
+ # We use different logdir paths based on how xdist is running.
+ if mode == "each":
+ if module:
+ return os.path.join(path, "worker-logs", xdist_worker)
+ return os.path.join(path, testname, xdist_worker)
+ assert mode in ("no", "load", "loadfile", "loadscope"), f"Unknown dist mode {mode}"
+ return path if module else os.path.join(path, testname)
-def set_handler(l, target=None):
+
+def set_handler(lg, target=None):
if target is None:
h = logging.NullHandler()
else:
@@ -59,106 +80,81 @@ def set_handler(l, target=None):
h.setFormatter(logging.Formatter(fmt=FORMAT))
# Don't filter anything at the handler level
h.setLevel(logging.DEBUG)
- l.addHandler(h)
+ lg.addHandler(h)
return h
-def set_log_level(l, level):
+def set_log_level(lg, level):
"Set the logging level."
# Messages sent to this logger only are created if this level or above.
log_level = DEBUG_TOPO2LOGGING.get(level, level)
- l.setLevel(log_level)
+ lg.setLevel(log_level)
-def get_logger(name, log_level=None, target=None):
- l = logging.getLogger("{}.{}".format(BASENAME, name))
+def reset_logger(lg):
+ while lg.handlers:
+ x = lg.handlers.pop()
+ x.close()
+ lg.removeHandler(x)
- if log_level is not None:
- set_log_level(l, log_level)
- if target is not None:
- set_handler(l, target)
-
- return l
-
-
-# nodeid: all_protocol_startup/test_all_protocol_startup.py::test_router_running
-
-
-def get_test_logdir(nodeid=None):
- """Get log directory relative pathname."""
- xdist_worker = os.getenv("PYTEST_XDIST_WORKER", "")
- mode = os.getenv("PYTEST_XDIST_MODE", "no")
+def get_logger(name, log_level=None, target=None, reset=True):
+ lg = logging.getLogger(name)
- if not nodeid:
- nodeid = os.environ["PYTEST_CURRENT_TEST"].split(" ")[0]
+ if reset:
+ reset_logger(lg)
- cur_test = nodeid.replace("[", "_").replace("]", "_")
- path, testname = cur_test.split("::")
- path = path[:-3].replace("/", ".")
+ if log_level is not None:
+ set_log_level(lg, log_level)
- # We use different logdir paths based on how xdist is running.
- if mode == "each":
- return os.path.join(path, testname, xdist_worker)
- elif mode == "load":
- return os.path.join(path, testname)
- else:
- assert (
- mode == "no" or mode == "loadfile" or mode == "loadscope"
- ), "Unknown dist mode {}".format(mode)
+ if target is not None:
+ set_handler(lg, target)
- return path
+ return lg
-def logstart(nodeid, location, rundir):
+def logstart(nodeid, logpath):
"""Called from pytest before module setup."""
-
- mode = os.getenv("PYTEST_XDIST_MODE", "no")
worker = os.getenv("PYTEST_TOPOTEST_WORKER", "")
+ wstr = f" on worker {worker}" if worker else ""
+ handler_id = nodeid + worker
+ logpath = logpath.absolute()
- # We only per-test log in the workers (or non-dist)
- if not worker and mode != "no":
- return
+ logging.debug("logstart: adding logging for %s%s at %s", nodeid, wstr, logpath)
+ root_logger = logging.getLogger()
+ handler = logging.FileHandler(logpath, mode="w")
+ handler.setFormatter(logging.Formatter(FORMAT))
- handler_id = nodeid + worker
- assert handler_id not in handlers
-
- rel_log_dir = get_test_logdir(nodeid)
- exec_log_dir = os.path.join(rundir, rel_log_dir)
- subprocess.check_call(
- "mkdir -p {0} && chmod 1777 {0}".format(exec_log_dir), shell=True
- )
- exec_log_path = os.path.join(exec_log_dir, "exec.log")
-
- # Add test based exec log handler
- h = set_handler(logger, exec_log_path)
- handlers[handler_id] = h
-
- if worker:
- logger.info(
- "Logging on worker %s for %s into %s", worker, handler_id, exec_log_path
- )
- else:
- logger.info("Logging for %s into %s", handler_id, exec_log_path)
+ root_logger.addHandler(handler)
+ handlers[handler_id] = handler
+ logging.debug("logstart: added logging for %s%s at %s", nodeid, wstr, logpath)
+ return handler
-def logfinish(nodeid, location):
- """Called from pytest after module teardown."""
- # This function may not be called if pytest is interrupted.
+def logfinish(nodeid, logpath):
+ """Called from pytest after module teardown."""
worker = os.getenv("PYTEST_TOPOTEST_WORKER", "")
- handler_id = nodeid + worker
+ wstr = f" on worker {worker}" if worker else ""
+
+ root_logger = logging.getLogger()
- if handler_id in handlers:
- # Remove test based exec log handler
- if worker:
- logger.info("Closing logs for %s", handler_id)
+ handler_id = nodeid + worker
+ if handler_id not in handlers:
+ logging.critical("can't find log handler to remove")
+ else:
+ logging.debug(
+ "logfinish: removing logging for %s%s at %s", nodeid, wstr, logpath
+ )
h = handlers[handler_id]
- logger.removeHandler(handlers[handler_id])
+ root_logger.removeHandler(h)
h.flush()
h.close()
del handlers[handler_id]
+ logging.debug(
+ "logfinish: removed logging for %s%s at %s", nodeid, wstr, logpath
+ )
console_handler = set_handler(logger, None)