Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_wsdump.py 6.9KB


  1. #!/usr/bin/env python3
  2. """
  3. wsdump.py
  4. websocket - WebSocket client library for Python
  5. Copyright 2023 engn33r
  6. Licensed under the Apache License, Version 2.0 (the "License");
  7. you may not use this file except in compliance with the License.
  8. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import argparse
  17. import code
  18. import sys
  19. import threading
  20. import time
  21. import ssl
  22. import gzip
  23. import zlib
  24. from urllib.parse import urlparse
  25. import websocket
  26. try:
  27. import readline
  28. except ImportError:
  29. pass
  30. def get_encoding() -> str:
  31. encoding = getattr(sys.stdin, "encoding", "")
  32. if not encoding:
  33. return "utf-8"
  34. else:
  35. return encoding.lower()
  36. OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
  37. ENCODING = get_encoding()
  38. class VAction(argparse.Action):
  39. def __call__(self, parser: argparse.Namespace, args: tuple, values: str, option_string: str = None) -> None:
  40. if values is None:
  41. values = "1"
  42. try:
  43. values = int(values)
  44. except ValueError:
  45. values = values.count("v") + 1
  46. setattr(args, self.dest, values)
  47. def parse_args() -> argparse.Namespace:
  48. parser = argparse.ArgumentParser(description="WebSocket Simple Dump Tool")
  49. parser.add_argument("url", metavar="ws_url",
  50. help="websocket url. ex. ws://echo.websocket.events/")
  51. parser.add_argument("-p", "--proxy",
  52. help="proxy url. ex. http://127.0.0.1:8080")
  53. parser.add_argument("-v", "--verbose", default=0, nargs='?', action=VAction,
  54. dest="verbose",
  55. help="set verbose mode. If set to 1, show opcode. "
  56. "If set to 2, enable to trace websocket module")
  57. parser.add_argument("-n", "--nocert", action='store_true',
  58. help="Ignore invalid SSL cert")
  59. parser.add_argument("-r", "--raw", action="store_true",
  60. help="raw output")
  61. parser.add_argument("-s", "--subprotocols", nargs='*',
  62. help="Set subprotocols")
  63. parser.add_argument("-o", "--origin",
  64. help="Set origin")
  65. parser.add_argument("--eof-wait", default=0, type=int,
  66. help="wait time(second) after 'EOF' received.")
  67. parser.add_argument("-t", "--text",
  68. help="Send initial text")
  69. parser.add_argument("--timings", action="store_true",
  70. help="Print timings in seconds")
  71. parser.add_argument("--headers",
  72. help="Set custom headers. Use ',' as separator")
  73. return parser.parse_args()
  74. class RawInput:
  75. def raw_input(self, prompt: str = "") -> str:
  76. line = input(prompt)
  77. if ENCODING and ENCODING != "utf-8" and not isinstance(line, str):
  78. line = line.decode(ENCODING).encode("utf-8")
  79. elif isinstance(line, str):
  80. line = line.encode("utf-8")
  81. return line
  82. class InteractiveConsole(RawInput, code.InteractiveConsole):
  83. def write(self, data: str) -> None:
  84. sys.stdout.write("\033[2K\033[E")
  85. # sys.stdout.write("\n")
  86. sys.stdout.write("\033[34m< " + data + "\033[39m")
  87. sys.stdout.write("\n> ")
  88. sys.stdout.flush()
  89. def read(self) -> str:
  90. return self.raw_input("> ")
  91. class NonInteractive(RawInput):
  92. def write(self, data: str) -> None:
  93. sys.stdout.write(data)
  94. sys.stdout.write("\n")
  95. sys.stdout.flush()
  96. def read(self) -> str:
  97. return self.raw_input("")
  98. def main() -> None:
  99. start_time = time.time()
  100. args = parse_args()
  101. if args.verbose > 1:
  102. websocket.enableTrace(True)
  103. options = {}
  104. if args.proxy:
  105. p = urlparse(args.proxy)
  106. options["http_proxy_host"] = p.hostname
  107. options["http_proxy_port"] = p.port
  108. if args.origin:
  109. options["origin"] = args.origin
  110. if args.subprotocols:
  111. options["subprotocols"] = args.subprotocols
  112. opts = {}
  113. if args.nocert:
  114. opts = {"cert_reqs": ssl.CERT_NONE, "check_hostname": False}
  115. if args.headers:
  116. options['header'] = list(map(str.strip, args.headers.split(',')))
  117. ws = websocket.create_connection(args.url, sslopt=opts, **options)
  118. if args.raw:
  119. console = NonInteractive()
  120. else:
  121. console = InteractiveConsole()
  122. print("Press Ctrl+C to quit")
  123. def recv() -> tuple:
  124. try:
  125. frame = ws.recv_frame()
  126. except websocket.WebSocketException:
  127. return websocket.ABNF.OPCODE_CLOSE, ""
  128. if not frame:
  129. raise websocket.WebSocketException("Not a valid frame {frame}".format(frame=frame))
  130. elif frame.opcode in OPCODE_DATA:
  131. return frame.opcode, frame.data
  132. elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
  133. ws.send_close()
  134. return frame.opcode, ""
  135. elif frame.opcode == websocket.ABNF.OPCODE_PING:
  136. ws.pong(frame.data)
  137. return frame.opcode, frame.data
  138. return frame.opcode, frame.data
  139. def recv_ws() -> None:
  140. while True:
  141. opcode, data = recv()
  142. msg = None
  143. if opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes):
  144. data = str(data, "utf-8")
  145. if isinstance(data, bytes) and len(data) > 2 and data[:2] == b'\037\213': # gzip magick
  146. try:
  147. data = "[gzip] " + str(gzip.decompress(data), "utf-8")
  148. except:
  149. pass
  150. elif isinstance(data, bytes):
  151. try:
  152. data = "[zlib] " + str(zlib.decompress(data, -zlib.MAX_WBITS), "utf-8")
  153. except:
  154. pass
  155. if isinstance(data, bytes):
  156. data = repr(data)
  157. if args.verbose:
  158. msg = "{opcode}: {data}".format(opcode=websocket.ABNF.OPCODE_MAP.get(opcode), data=data)
  159. else:
  160. msg = data
  161. if msg is not None:
  162. if args.timings:
  163. console.write(str(time.time() - start_time) + ": " + msg)
  164. else:
  165. console.write(msg)
  166. if opcode == websocket.ABNF.OPCODE_CLOSE:
  167. break
  168. thread = threading.Thread(target=recv_ws)
  169. thread.daemon = True
  170. thread.start()
  171. if args.text:
  172. ws.send(args.text)
  173. while True:
  174. try:
  175. message = console.read()
  176. ws.send(message)
  177. except KeyboardInterrupt:
  178. return
  179. except EOFError:
  180. time.sleep(args.eof_wait)
  181. return
  182. if __name__ == "__main__":
  183. try:
  184. main()
  185. except Exception as e:
  186. print(e)