Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

headers.py 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. from __future__ import annotations
  2. import base64
  3. import binascii
  4. import ipaddress
  5. import re
  6. from typing import Callable, List, Optional, Sequence, Tuple, TypeVar, cast
  7. from . import exceptions
  8. from .typing import (
  9. ConnectionOption,
  10. ExtensionHeader,
  11. ExtensionName,
  12. ExtensionParameter,
  13. Subprotocol,
  14. UpgradeProtocol,
  15. )
  16. __all__ = [
  17. "build_host",
  18. "parse_connection",
  19. "parse_upgrade",
  20. "parse_extension",
  21. "build_extension",
  22. "parse_subprotocol",
  23. "build_subprotocol",
  24. "validate_subprotocols",
  25. "build_www_authenticate_basic",
  26. "parse_authorization_basic",
  27. "build_authorization_basic",
  28. ]
  29. T = TypeVar("T")
  30. def build_host(host: str, port: int, secure: bool) -> str:
  31. """
  32. Build a ``Host`` header.
  33. """
  34. # https://www.rfc-editor.org/rfc/rfc3986.html#section-3.2.2
  35. # IPv6 addresses must be enclosed in brackets.
  36. try:
  37. address = ipaddress.ip_address(host)
  38. except ValueError:
  39. # host is a hostname
  40. pass
  41. else:
  42. # host is an IP address
  43. if address.version == 6:
  44. host = f"[{host}]"
  45. if port != (443 if secure else 80):
  46. host = f"{host}:{port}"
  47. return host
  48. # To avoid a dependency on a parsing library, we implement manually the ABNF
  49. # described in https://www.rfc-editor.org/rfc/rfc6455.html#section-9.1 and
  50. # https://www.rfc-editor.org/rfc/rfc7230.html#appendix-B.
  51. def peek_ahead(header: str, pos: int) -> Optional[str]:
  52. """
  53. Return the next character from ``header`` at the given position.
  54. Return :obj:`None` at the end of ``header``.
  55. We never need to peek more than one character ahead.
  56. """
  57. return None if pos == len(header) else header[pos]
  58. _OWS_re = re.compile(r"[\t ]*")
  59. def parse_OWS(header: str, pos: int) -> int:
  60. """
  61. Parse optional whitespace from ``header`` at the given position.
  62. Return the new position.
  63. The whitespace itself isn't returned because it isn't significant.
  64. """
  65. # There's always a match, possibly empty, whose content doesn't matter.
  66. match = _OWS_re.match(header, pos)
  67. assert match is not None
  68. return match.end()
  69. _token_re = re.compile(r"[-!#$%&\'*+.^_`|~0-9a-zA-Z]+")
  70. def parse_token(header: str, pos: int, header_name: str) -> Tuple[str, int]:
  71. """
  72. Parse a token from ``header`` at the given position.
  73. Return the token value and the new position.
  74. Raises:
  75. InvalidHeaderFormat: on invalid inputs.
  76. """
  77. match = _token_re.match(header, pos)
  78. if match is None:
  79. raise exceptions.InvalidHeaderFormat(header_name, "expected token", header, pos)
  80. return match.group(), match.end()
  81. _quoted_string_re = re.compile(
  82. r'"(?:[\x09\x20-\x21\x23-\x5b\x5d-\x7e]|\\[\x09\x20-\x7e\x80-\xff])*"'
  83. )
  84. _unquote_re = re.compile(r"\\([\x09\x20-\x7e\x80-\xff])")
  85. def parse_quoted_string(header: str, pos: int, header_name: str) -> Tuple[str, int]:
  86. """
  87. Parse a quoted string from ``header`` at the given position.
  88. Return the unquoted value and the new position.
  89. Raises:
  90. InvalidHeaderFormat: on invalid inputs.
  91. """
  92. match = _quoted_string_re.match(header, pos)
  93. if match is None:
  94. raise exceptions.InvalidHeaderFormat(
  95. header_name, "expected quoted string", header, pos
  96. )
  97. return _unquote_re.sub(r"\1", match.group()[1:-1]), match.end()
  98. _quotable_re = re.compile(r"[\x09\x20-\x7e\x80-\xff]*")
  99. _quote_re = re.compile(r"([\x22\x5c])")
  100. def build_quoted_string(value: str) -> str:
  101. """
  102. Format ``value`` as a quoted string.
  103. This is the reverse of :func:`parse_quoted_string`.
  104. """
  105. match = _quotable_re.fullmatch(value)
  106. if match is None:
  107. raise ValueError("invalid characters for quoted-string encoding")
  108. return '"' + _quote_re.sub(r"\\\1", value) + '"'
  109. def parse_list(
  110. parse_item: Callable[[str, int, str], Tuple[T, int]],
  111. header: str,
  112. pos: int,
  113. header_name: str,
  114. ) -> List[T]:
  115. """
  116. Parse a comma-separated list from ``header`` at the given position.
  117. This is appropriate for parsing values with the following grammar:
  118. 1#item
  119. ``parse_item`` parses one item.
  120. ``header`` is assumed not to start or end with whitespace.
  121. (This function is designed for parsing an entire header value and
  122. :func:`~websockets.http.read_headers` strips whitespace from values.)
  123. Return a list of items.
  124. Raises:
  125. InvalidHeaderFormat: on invalid inputs.
  126. """
  127. # Per https://www.rfc-editor.org/rfc/rfc7230.html#section-7, "a recipient
  128. # MUST parse and ignore a reasonable number of empty list elements";
  129. # hence while loops that remove extra delimiters.
  130. # Remove extra delimiters before the first item.
  131. while peek_ahead(header, pos) == ",":
  132. pos = parse_OWS(header, pos + 1)
  133. items = []
  134. while True:
  135. # Loop invariant: a item starts at pos in header.
  136. item, pos = parse_item(header, pos, header_name)
  137. items.append(item)
  138. pos = parse_OWS(header, pos)
  139. # We may have reached the end of the header.
  140. if pos == len(header):
  141. break
  142. # There must be a delimiter after each element except the last one.
  143. if peek_ahead(header, pos) == ",":
  144. pos = parse_OWS(header, pos + 1)
  145. else:
  146. raise exceptions.InvalidHeaderFormat(
  147. header_name, "expected comma", header, pos
  148. )
  149. # Remove extra delimiters before the next item.
  150. while peek_ahead(header, pos) == ",":
  151. pos = parse_OWS(header, pos + 1)
  152. # We may have reached the end of the header.
  153. if pos == len(header):
  154. break
  155. # Since we only advance in the header by one character with peek_ahead()
  156. # or with the end position of a regex match, we can't overshoot the end.
  157. assert pos == len(header)
  158. return items
  159. def parse_connection_option(
  160. header: str, pos: int, header_name: str
  161. ) -> Tuple[ConnectionOption, int]:
  162. """
  163. Parse a Connection option from ``header`` at the given position.
  164. Return the protocol value and the new position.
  165. Raises:
  166. InvalidHeaderFormat: on invalid inputs.
  167. """
  168. item, pos = parse_token(header, pos, header_name)
  169. return cast(ConnectionOption, item), pos
  170. def parse_connection(header: str) -> List[ConnectionOption]:
  171. """
  172. Parse a ``Connection`` header.
  173. Return a list of HTTP connection options.
  174. Args
  175. header: value of the ``Connection`` header.
  176. Raises:
  177. InvalidHeaderFormat: on invalid inputs.
  178. """
  179. return parse_list(parse_connection_option, header, 0, "Connection")
  180. _protocol_re = re.compile(
  181. r"[-!#$%&\'*+.^_`|~0-9a-zA-Z]+(?:/[-!#$%&\'*+.^_`|~0-9a-zA-Z]+)?"
  182. )
  183. def parse_upgrade_protocol(
  184. header: str, pos: int, header_name: str
  185. ) -> Tuple[UpgradeProtocol, int]:
  186. """
  187. Parse an Upgrade protocol from ``header`` at the given position.
  188. Return the protocol value and the new position.
  189. Raises:
  190. InvalidHeaderFormat: on invalid inputs.
  191. """
  192. match = _protocol_re.match(header, pos)
  193. if match is None:
  194. raise exceptions.InvalidHeaderFormat(
  195. header_name, "expected protocol", header, pos
  196. )
  197. return cast(UpgradeProtocol, match.group()), match.end()
  198. def parse_upgrade(header: str) -> List[UpgradeProtocol]:
  199. """
  200. Parse an ``Upgrade`` header.
  201. Return a list of HTTP protocols.
  202. Args:
  203. header: value of the ``Upgrade`` header.
  204. Raises:
  205. InvalidHeaderFormat: on invalid inputs.
  206. """
  207. return parse_list(parse_upgrade_protocol, header, 0, "Upgrade")
  208. def parse_extension_item_param(
  209. header: str, pos: int, header_name: str
  210. ) -> Tuple[ExtensionParameter, int]:
  211. """
  212. Parse a single extension parameter from ``header`` at the given position.
  213. Return a ``(name, value)`` pair and the new position.
  214. Raises:
  215. InvalidHeaderFormat: on invalid inputs.
  216. """
  217. # Extract parameter name.
  218. name, pos = parse_token(header, pos, header_name)
  219. pos = parse_OWS(header, pos)
  220. # Extract parameter value, if there is one.
  221. value: Optional[str] = None
  222. if peek_ahead(header, pos) == "=":
  223. pos = parse_OWS(header, pos + 1)
  224. if peek_ahead(header, pos) == '"':
  225. pos_before = pos # for proper error reporting below
  226. value, pos = parse_quoted_string(header, pos, header_name)
  227. # https://www.rfc-editor.org/rfc/rfc6455.html#section-9.1 says:
  228. # the value after quoted-string unescaping MUST conform to
  229. # the 'token' ABNF.
  230. if _token_re.fullmatch(value) is None:
  231. raise exceptions.InvalidHeaderFormat(
  232. header_name, "invalid quoted header content", header, pos_before
  233. )
  234. else:
  235. value, pos = parse_token(header, pos, header_name)
  236. pos = parse_OWS(header, pos)
  237. return (name, value), pos
  238. def parse_extension_item(
  239. header: str, pos: int, header_name: str
  240. ) -> Tuple[ExtensionHeader, int]:
  241. """
  242. Parse an extension definition from ``header`` at the given position.
  243. Return an ``(extension name, parameters)`` pair, where ``parameters`` is a
  244. list of ``(name, value)`` pairs, and the new position.
  245. Raises:
  246. InvalidHeaderFormat: on invalid inputs.
  247. """
  248. # Extract extension name.
  249. name, pos = parse_token(header, pos, header_name)
  250. pos = parse_OWS(header, pos)
  251. # Extract all parameters.
  252. parameters = []
  253. while peek_ahead(header, pos) == ";":
  254. pos = parse_OWS(header, pos + 1)
  255. parameter, pos = parse_extension_item_param(header, pos, header_name)
  256. parameters.append(parameter)
  257. return (cast(ExtensionName, name), parameters), pos
  258. def parse_extension(header: str) -> List[ExtensionHeader]:
  259. """
  260. Parse a ``Sec-WebSocket-Extensions`` header.
  261. Return a list of WebSocket extensions and their parameters in this format::
  262. [
  263. (
  264. 'extension name',
  265. [
  266. ('parameter name', 'parameter value'),
  267. ....
  268. ]
  269. ),
  270. ...
  271. ]
  272. Parameter values are :obj:`None` when no value is provided.
  273. Raises:
  274. InvalidHeaderFormat: on invalid inputs.
  275. """
  276. return parse_list(parse_extension_item, header, 0, "Sec-WebSocket-Extensions")
  277. parse_extension_list = parse_extension # alias for backwards compatibility
  278. def build_extension_item(
  279. name: ExtensionName, parameters: List[ExtensionParameter]
  280. ) -> str:
  281. """
  282. Build an extension definition.
  283. This is the reverse of :func:`parse_extension_item`.
  284. """
  285. return "; ".join(
  286. [cast(str, name)]
  287. + [
  288. # Quoted strings aren't necessary because values are always tokens.
  289. name if value is None else f"{name}={value}"
  290. for name, value in parameters
  291. ]
  292. )
  293. def build_extension(extensions: Sequence[ExtensionHeader]) -> str:
  294. """
  295. Build a ``Sec-WebSocket-Extensions`` header.
  296. This is the reverse of :func:`parse_extension`.
  297. """
  298. return ", ".join(
  299. build_extension_item(name, parameters) for name, parameters in extensions
  300. )
  301. build_extension_list = build_extension # alias for backwards compatibility
  302. def parse_subprotocol_item(
  303. header: str, pos: int, header_name: str
  304. ) -> Tuple[Subprotocol, int]:
  305. """
  306. Parse a subprotocol from ``header`` at the given position.
  307. Return the subprotocol value and the new position.
  308. Raises:
  309. InvalidHeaderFormat: on invalid inputs.
  310. """
  311. item, pos = parse_token(header, pos, header_name)
  312. return cast(Subprotocol, item), pos
  313. def parse_subprotocol(header: str) -> List[Subprotocol]:
  314. """
  315. Parse a ``Sec-WebSocket-Protocol`` header.
  316. Return a list of WebSocket subprotocols.
  317. Raises:
  318. InvalidHeaderFormat: on invalid inputs.
  319. """
  320. return parse_list(parse_subprotocol_item, header, 0, "Sec-WebSocket-Protocol")
  321. parse_subprotocol_list = parse_subprotocol # alias for backwards compatibility
  322. def build_subprotocol(subprotocols: Sequence[Subprotocol]) -> str:
  323. """
  324. Build a ``Sec-WebSocket-Protocol`` header.
  325. This is the reverse of :func:`parse_subprotocol`.
  326. """
  327. return ", ".join(subprotocols)
  328. build_subprotocol_list = build_subprotocol # alias for backwards compatibility
  329. def validate_subprotocols(subprotocols: Sequence[Subprotocol]) -> None:
  330. """
  331. Validate that ``subprotocols`` is suitable for :func:`build_subprotocol`.
  332. """
  333. if not isinstance(subprotocols, Sequence):
  334. raise TypeError("subprotocols must be a list")
  335. if isinstance(subprotocols, str):
  336. raise TypeError("subprotocols must be a list, not a str")
  337. for subprotocol in subprotocols:
  338. if not _token_re.fullmatch(subprotocol):
  339. raise ValueError(f"invalid subprotocol: {subprotocol}")
  340. def build_www_authenticate_basic(realm: str) -> str:
  341. """
  342. Build a ``WWW-Authenticate`` header for HTTP Basic Auth.
  343. Args:
  344. realm: identifier of the protection space.
  345. """
  346. # https://www.rfc-editor.org/rfc/rfc7617.html#section-2
  347. realm = build_quoted_string(realm)
  348. charset = build_quoted_string("UTF-8")
  349. return f"Basic realm={realm}, charset={charset}"
  350. _token68_re = re.compile(r"[A-Za-z0-9-._~+/]+=*")
  351. def parse_token68(header: str, pos: int, header_name: str) -> Tuple[str, int]:
  352. """
  353. Parse a token68 from ``header`` at the given position.
  354. Return the token value and the new position.
  355. Raises:
  356. InvalidHeaderFormat: on invalid inputs.
  357. """
  358. match = _token68_re.match(header, pos)
  359. if match is None:
  360. raise exceptions.InvalidHeaderFormat(
  361. header_name, "expected token68", header, pos
  362. )
  363. return match.group(), match.end()
  364. def parse_end(header: str, pos: int, header_name: str) -> None:
  365. """
  366. Check that parsing reached the end of header.
  367. """
  368. if pos < len(header):
  369. raise exceptions.InvalidHeaderFormat(header_name, "trailing data", header, pos)
  370. def parse_authorization_basic(header: str) -> Tuple[str, str]:
  371. """
  372. Parse an ``Authorization`` header for HTTP Basic Auth.
  373. Return a ``(username, password)`` tuple.
  374. Args:
  375. header: value of the ``Authorization`` header.
  376. Raises:
  377. InvalidHeaderFormat: on invalid inputs.
  378. InvalidHeaderValue: on unsupported inputs.
  379. """
  380. # https://www.rfc-editor.org/rfc/rfc7235.html#section-2.1
  381. # https://www.rfc-editor.org/rfc/rfc7617.html#section-2
  382. scheme, pos = parse_token(header, 0, "Authorization")
  383. if scheme.lower() != "basic":
  384. raise exceptions.InvalidHeaderValue(
  385. "Authorization",
  386. f"unsupported scheme: {scheme}",
  387. )
  388. if peek_ahead(header, pos) != " ":
  389. raise exceptions.InvalidHeaderFormat(
  390. "Authorization", "expected space after scheme", header, pos
  391. )
  392. pos += 1
  393. basic_credentials, pos = parse_token68(header, pos, "Authorization")
  394. parse_end(header, pos, "Authorization")
  395. try:
  396. user_pass = base64.b64decode(basic_credentials.encode()).decode()
  397. except binascii.Error:
  398. raise exceptions.InvalidHeaderValue(
  399. "Authorization",
  400. "expected base64-encoded credentials",
  401. ) from None
  402. try:
  403. username, password = user_pass.split(":", 1)
  404. except ValueError:
  405. raise exceptions.InvalidHeaderValue(
  406. "Authorization",
  407. "expected username:password credentials",
  408. ) from None
  409. return username, password
  410. def build_authorization_basic(username: str, password: str) -> str:
  411. """
  412. Build an ``Authorization`` header for HTTP Basic Auth.
  413. This is the reverse of :func:`parse_authorization_basic`.
  414. """
  415. # https://www.rfc-editor.org/rfc/rfc7617.html#section-2
  416. assert ":" not in username
  417. user_pass = f"{username}:{password}"
  418. basic_credentials = base64.b64encode(user_pass.encode()).decode()
  419. return "Basic " + basic_credentials