summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/micronet_cli.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/micronet_cli.py')
-rw-r--r--tests/topotests/lib/micronet_cli.py306
1 files changed, 0 insertions, 306 deletions
diff --git a/tests/topotests/lib/micronet_cli.py b/tests/topotests/lib/micronet_cli.py
deleted file mode 100644
index e54b75f710..0000000000
--- a/tests/topotests/lib/micronet_cli.py
+++ /dev/null
@@ -1,306 +0,0 @@
-# -*- coding: utf-8 eval: (blacken-mode 1) -*-
-# SPDX-License-Identifier: GPL-2.0-or-later
-#
-# July 24 2021, Christian Hopps <chopps@labn.net>
-#
-# Copyright (c) 2021, LabN Consulting, L.L.C.
-#
-import argparse
-import logging
-import os
-import pty
-import re
-import readline
-import select
-import socket
-import subprocess
-import sys
-import tempfile
-import termios
-import tty
-
-
-ENDMARKER = b"\x00END\x00"
-
-
-def lineiter(sock):
- s = ""
- while True:
- sb = sock.recv(256)
- if not sb:
- return
-
- s += sb.decode("utf-8")
- i = s.find("\n")
- if i != -1:
- yield s[:i]
- s = s[i + 1 :]
-
-
-def spawn(unet, host, cmd):
- if sys.stdin.isatty():
- old_tty = termios.tcgetattr(sys.stdin)
- tty.setraw(sys.stdin.fileno())
- try:
- master_fd, slave_fd = pty.openpty()
-
- # use os.setsid() make it run in a new process group, or bash job
- # control will not be enabled
- p = unet.hosts[host].popen(
- cmd,
- preexec_fn=os.setsid,
- stdin=slave_fd,
- stdout=slave_fd,
- stderr=slave_fd,
- universal_newlines=True,
- )
-
- while p.poll() is None:
- r, w, e = select.select([sys.stdin, master_fd], [], [], 0.25)
- if sys.stdin in r:
- d = os.read(sys.stdin.fileno(), 10240)
- os.write(master_fd, d)
- elif master_fd in r:
- o = os.read(master_fd, 10240)
- if o:
- os.write(sys.stdout.fileno(), o)
- finally:
- # restore tty settings back
- if sys.stdin.isatty():
- termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
-
-
-def doline(unet, line, writef):
- def host_cmd_split(unet, cmd):
- csplit = cmd.split()
- for i, e in enumerate(csplit):
- if e not in unet.hosts:
- break
- hosts = csplit[:i]
- if not hosts:
- hosts = sorted(unet.hosts.keys())
- cmd = " ".join(csplit[i:])
- return hosts, cmd
-
- line = line.strip()
- m = re.match(r"^(\S+)(?:\s+(.*))?$", line)
- if not m:
- return True
-
- cmd = m.group(1)
- oargs = m.group(2) if m.group(2) else ""
- if cmd == "q" or cmd == "quit":
- return False
- if cmd == "hosts":
- writef("%% hosts: %s\n" % " ".join(sorted(unet.hosts.keys())))
- elif cmd in ["term", "vtysh", "xterm"]:
- args = oargs.split()
- if not args or (len(args) == 1 and args[0] == "*"):
- args = sorted(unet.hosts.keys())
- hosts = [unet.hosts[x] for x in args if x in unet.hosts]
- for host in hosts:
- if cmd == "t" or cmd == "term":
- host.run_in_window("bash", title="sh-%s" % host)
- elif cmd == "v" or cmd == "vtysh":
- host.run_in_window("vtysh", title="vt-%s" % host)
- elif cmd == "x" or cmd == "xterm":
- host.run_in_window("bash", title="sh-%s" % host, forcex=True)
- elif cmd == "sh":
- hosts, cmd = host_cmd_split(unet, oargs)
- for host in hosts:
- if sys.stdin.isatty():
- spawn(unet, host, cmd)
- else:
- if len(hosts) > 1:
- writef("------ Host: %s ------\n" % host)
- output = unet.hosts[host].cmd_legacy(cmd)
- writef(output)
- if len(hosts) > 1:
- writef("------- End: %s ------\n" % host)
- writef("\n")
- elif cmd == "h" or cmd == "help":
- writef(
- """
-Commands:
- help :: this help
- sh [hosts] <shell-command> :: execute <shell-command> on <host>
- term [hosts] :: open shell terminals for hosts
- vtysh [hosts] :: open vtysh terminals for hosts
- [hosts] <vtysh-command> :: execute vtysh-command on hosts\n\n"""
- )
- else:
- hosts, cmd = host_cmd_split(unet, line)
- for host in hosts:
- if len(hosts) > 1:
- writef("------ Host: %s ------\n" % host)
- output = unet.hosts[host].cmd_legacy('vtysh -c "{}"'.format(cmd))
- writef(output)
- if len(hosts) > 1:
- writef("------- End: %s ------\n" % host)
- writef("\n")
- return True
-
-
-def cli_server_setup(unet):
- sockdir = tempfile.mkdtemp("-sockdir", "pyt")
- sockpath = os.path.join(sockdir, "cli-server.sock")
- try:
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.settimeout(10)
- sock.bind(sockpath)
- sock.listen(1)
- return sock, sockdir, sockpath
- except Exception:
- unet.cmd_status("rm -rf " + sockdir)
- raise
-
-
-def cli_server(unet, server_sock):
- sock, addr = server_sock.accept()
-
- # Go into full non-blocking mode now
- sock.settimeout(None)
-
- for line in lineiter(sock):
- line = line.strip()
-
- def writef(x):
- xb = x.encode("utf-8")
- sock.send(xb)
-
- if not doline(unet, line, writef):
- return
- sock.send(ENDMARKER)
-
-
-def cli_client(sockpath, prompt="unet> "):
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.settimeout(10)
- sock.connect(sockpath)
-
- # Go into full non-blocking mode now
- sock.settimeout(None)
-
- print("\n--- Micronet CLI Starting ---\n\n")
- while True:
- if sys.version_info[0] == 2:
- line = raw_input(prompt) # pylint: disable=E0602
- else:
- line = input(prompt)
- if line is None:
- return
-
- # Need to put \n back
- line += "\n"
-
- # Send the CLI command
- sock.send(line.encode("utf-8"))
-
- def bendswith(b, sentinel):
- slen = len(sentinel)
- return len(b) >= slen and b[-slen:] == sentinel
-
- # Collect the output
- rb = b""
- while not bendswith(rb, ENDMARKER):
- lb = sock.recv(4096)
- if not lb:
- return
- rb += lb
-
- # Remove the marker
- rb = rb[: -len(ENDMARKER)]
-
- # Write the output
- sys.stdout.write(rb.decode("utf-8"))
-
-
-def local_cli(unet, outf, prompt="unet> "):
- print("\n--- Micronet CLI Starting ---\n\n")
- while True:
- if sys.version_info[0] == 2:
- line = raw_input(prompt) # pylint: disable=E0602
- else:
- line = input(prompt)
- if line is None:
- return
- if not doline(unet, line, outf.write):
- return
-
-
-def cli(
- unet,
- histfile=None,
- sockpath=None,
- force_window=False,
- title=None,
- prompt=None,
- background=True,
-):
- logger = logging.getLogger("cli-client")
-
- if prompt is None:
- prompt = "unet> "
-
- if force_window or not sys.stdin.isatty():
- # Run CLI in another window b/c we have no tty.
- sock, sockdir, sockpath = cli_server_setup(unet)
-
- python_path = unet.get_exec_path(["python3", "python"])
- us = os.path.realpath(__file__)
- cmd = "{} {}".format(python_path, us)
- if histfile:
- cmd += " --histfile=" + histfile
- if title:
- cmd += " --prompt={}".format(title)
- cmd += " " + sockpath
-
- try:
- unet.run_in_window(cmd, new_window=True, title=title, background=background)
- return cli_server(unet, sock)
- finally:
- unet.cmd_status("rm -rf " + sockdir)
-
- if not unet:
- logger.debug("client-cli using sockpath %s", sockpath)
-
- try:
- if histfile is None:
- histfile = os.path.expanduser("~/.micronet-history.txt")
- if not os.path.exists(histfile):
- if unet:
- unet.cmd("touch " + histfile)
- else:
- subprocess.run("touch " + histfile)
- if histfile:
- readline.read_history_file(histfile)
- except Exception:
- pass
-
- try:
- if sockpath:
- cli_client(sockpath, prompt=prompt)
- else:
- local_cli(unet, sys.stdout, prompt=prompt)
- except EOFError:
- pass
- except Exception as ex:
- logger.critical("cli: got exception: %s", ex, exc_info=True)
- raise
- finally:
- readline.write_history_file(histfile)
-
-
-if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG, filename="/tmp/topotests/cli-client.log")
- logger = logging.getLogger("cli-client")
- logger.info("Start logging cli-client")
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--histfile", help="file to user for history")
- parser.add_argument("--prompt-text", help="prompt string to use")
- parser.add_argument("socket", help="path to pair of sockets to communicate over")
- args = parser.parse_args()
-
- prompt = "{}> ".format(args.prompt_text) if args.prompt_text else "unet> "
- cli(None, args.histfile, args.socket, prompt=prompt)