Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_collections.py 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. from __future__ import annotations
  2. import typing
  3. from collections import OrderedDict
  4. from enum import Enum, auto
  5. from threading import RLock
  6. if typing.TYPE_CHECKING:
  7. # We can only import Protocol if TYPE_CHECKING because it's a development
  8. # dependency, and is not available at runtime.
  9. from typing_extensions import Protocol
  10. class HasGettableStringKeys(Protocol):
  11. def keys(self) -> typing.Iterator[str]:
  12. ...
  13. def __getitem__(self, key: str) -> str:
  14. ...
  15. __all__ = ["RecentlyUsedContainer", "HTTPHeaderDict"]
  16. # Key type
  17. _KT = typing.TypeVar("_KT")
  18. # Value type
  19. _VT = typing.TypeVar("_VT")
  20. # Default type
  21. _DT = typing.TypeVar("_DT")
  22. ValidHTTPHeaderSource = typing.Union[
  23. "HTTPHeaderDict",
  24. typing.Mapping[str, str],
  25. typing.Iterable[typing.Tuple[str, str]],
  26. "HasGettableStringKeys",
  27. ]
  28. class _Sentinel(Enum):
  29. not_passed = auto()
  30. def ensure_can_construct_http_header_dict(
  31. potential: object,
  32. ) -> ValidHTTPHeaderSource | None:
  33. if isinstance(potential, HTTPHeaderDict):
  34. return potential
  35. elif isinstance(potential, typing.Mapping):
  36. # Full runtime checking of the contents of a Mapping is expensive, so for the
  37. # purposes of typechecking, we assume that any Mapping is the right shape.
  38. return typing.cast(typing.Mapping[str, str], potential)
  39. elif isinstance(potential, typing.Iterable):
  40. # Similarly to Mapping, full runtime checking of the contents of an Iterable is
  41. # expensive, so for the purposes of typechecking, we assume that any Iterable
  42. # is the right shape.
  43. return typing.cast(typing.Iterable[typing.Tuple[str, str]], potential)
  44. elif hasattr(potential, "keys") and hasattr(potential, "__getitem__"):
  45. return typing.cast("HasGettableStringKeys", potential)
  46. else:
  47. return None
  48. class RecentlyUsedContainer(typing.Generic[_KT, _VT], typing.MutableMapping[_KT, _VT]):
  49. """
  50. Provides a thread-safe dict-like container which maintains up to
  51. ``maxsize`` keys while throwing away the least-recently-used keys beyond
  52. ``maxsize``.
  53. :param maxsize:
  54. Maximum number of recent elements to retain.
  55. :param dispose_func:
  56. Every time an item is evicted from the container,
  57. ``dispose_func(value)`` is called. Callback which will get called
  58. """
  59. _container: typing.OrderedDict[_KT, _VT]
  60. _maxsize: int
  61. dispose_func: typing.Callable[[_VT], None] | None
  62. lock: RLock
  63. def __init__(
  64. self,
  65. maxsize: int = 10,
  66. dispose_func: typing.Callable[[_VT], None] | None = None,
  67. ) -> None:
  68. super().__init__()
  69. self._maxsize = maxsize
  70. self.dispose_func = dispose_func
  71. self._container = OrderedDict()
  72. self.lock = RLock()
  73. def __getitem__(self, key: _KT) -> _VT:
  74. # Re-insert the item, moving it to the end of the eviction line.
  75. with self.lock:
  76. item = self._container.pop(key)
  77. self._container[key] = item
  78. return item
  79. def __setitem__(self, key: _KT, value: _VT) -> None:
  80. evicted_item = None
  81. with self.lock:
  82. # Possibly evict the existing value of 'key'
  83. try:
  84. # If the key exists, we'll overwrite it, which won't change the
  85. # size of the pool. Because accessing a key should move it to
  86. # the end of the eviction line, we pop it out first.
  87. evicted_item = key, self._container.pop(key)
  88. self._container[key] = value
  89. except KeyError:
  90. # When the key does not exist, we insert the value first so that
  91. # evicting works in all cases, including when self._maxsize is 0
  92. self._container[key] = value
  93. if len(self._container) > self._maxsize:
  94. # If we didn't evict an existing value, and we've hit our maximum
  95. # size, then we have to evict the least recently used item from
  96. # the beginning of the container.
  97. evicted_item = self._container.popitem(last=False)
  98. # After releasing the lock on the pool, dispose of any evicted value.
  99. if evicted_item is not None and self.dispose_func:
  100. _, evicted_value = evicted_item
  101. self.dispose_func(evicted_value)
  102. def __delitem__(self, key: _KT) -> None:
  103. with self.lock:
  104. value = self._container.pop(key)
  105. if self.dispose_func:
  106. self.dispose_func(value)
  107. def __len__(self) -> int:
  108. with self.lock:
  109. return len(self._container)
  110. def __iter__(self) -> typing.NoReturn:
  111. raise NotImplementedError(
  112. "Iteration over this class is unlikely to be threadsafe."
  113. )
  114. def clear(self) -> None:
  115. with self.lock:
  116. # Copy pointers to all values, then wipe the mapping
  117. values = list(self._container.values())
  118. self._container.clear()
  119. if self.dispose_func:
  120. for value in values:
  121. self.dispose_func(value)
  122. def keys(self) -> set[_KT]: # type: ignore[override]
  123. with self.lock:
  124. return set(self._container.keys())
  125. class HTTPHeaderDictItemView(typing.Set[typing.Tuple[str, str]]):
  126. """
  127. HTTPHeaderDict is unusual for a Mapping[str, str] in that it has two modes of
  128. address.
  129. If we directly try to get an item with a particular name, we will get a string
  130. back that is the concatenated version of all the values:
  131. >>> d['X-Header-Name']
  132. 'Value1, Value2, Value3'
  133. However, if we iterate over an HTTPHeaderDict's items, we will optionally combine
  134. these values based on whether combine=True was called when building up the dictionary
  135. >>> d = HTTPHeaderDict({"A": "1", "B": "foo"})
  136. >>> d.add("A", "2", combine=True)
  137. >>> d.add("B", "bar")
  138. >>> list(d.items())
  139. [
  140. ('A', '1, 2'),
  141. ('B', 'foo'),
  142. ('B', 'bar'),
  143. ]
  144. This class conforms to the interface required by the MutableMapping ABC while
  145. also giving us the nonstandard iteration behavior we want; items with duplicate
  146. keys, ordered by time of first insertion.
  147. """
  148. _headers: HTTPHeaderDict
  149. def __init__(self, headers: HTTPHeaderDict) -> None:
  150. self._headers = headers
  151. def __len__(self) -> int:
  152. return len(list(self._headers.iteritems()))
  153. def __iter__(self) -> typing.Iterator[tuple[str, str]]:
  154. return self._headers.iteritems()
  155. def __contains__(self, item: object) -> bool:
  156. if isinstance(item, tuple) and len(item) == 2:
  157. passed_key, passed_val = item
  158. if isinstance(passed_key, str) and isinstance(passed_val, str):
  159. return self._headers._has_value_for_header(passed_key, passed_val)
  160. return False
  161. class HTTPHeaderDict(typing.MutableMapping[str, str]):
  162. """
  163. :param headers:
  164. An iterable of field-value pairs. Must not contain multiple field names
  165. when compared case-insensitively.
  166. :param kwargs:
  167. Additional field-value pairs to pass in to ``dict.update``.
  168. A ``dict`` like container for storing HTTP Headers.
  169. Field names are stored and compared case-insensitively in compliance with
  170. RFC 7230. Iteration provides the first case-sensitive key seen for each
  171. case-insensitive pair.
  172. Using ``__setitem__`` syntax overwrites fields that compare equal
  173. case-insensitively in order to maintain ``dict``'s api. For fields that
  174. compare equal, instead create a new ``HTTPHeaderDict`` and use ``.add``
  175. in a loop.
  176. If multiple fields that are equal case-insensitively are passed to the
  177. constructor or ``.update``, the behavior is undefined and some will be
  178. lost.
  179. >>> headers = HTTPHeaderDict()
  180. >>> headers.add('Set-Cookie', 'foo=bar')
  181. >>> headers.add('set-cookie', 'baz=quxx')
  182. >>> headers['content-length'] = '7'
  183. >>> headers['SET-cookie']
  184. 'foo=bar, baz=quxx'
  185. >>> headers['Content-Length']
  186. '7'
  187. """
  188. _container: typing.MutableMapping[str, list[str]]
  189. def __init__(self, headers: ValidHTTPHeaderSource | None = None, **kwargs: str):
  190. super().__init__()
  191. self._container = {} # 'dict' is insert-ordered in Python 3.7+
  192. if headers is not None:
  193. if isinstance(headers, HTTPHeaderDict):
  194. self._copy_from(headers)
  195. else:
  196. self.extend(headers)
  197. if kwargs:
  198. self.extend(kwargs)
  199. def __setitem__(self, key: str, val: str) -> None:
  200. # avoid a bytes/str comparison by decoding before httplib
  201. if isinstance(key, bytes):
  202. key = key.decode("latin-1")
  203. self._container[key.lower()] = [key, val]
  204. def __getitem__(self, key: str) -> str:
  205. val = self._container[key.lower()]
  206. return ", ".join(val[1:])
  207. def __delitem__(self, key: str) -> None:
  208. del self._container[key.lower()]
  209. def __contains__(self, key: object) -> bool:
  210. if isinstance(key, str):
  211. return key.lower() in self._container
  212. return False
  213. def setdefault(self, key: str, default: str = "") -> str:
  214. return super().setdefault(key, default)
  215. def __eq__(self, other: object) -> bool:
  216. maybe_constructable = ensure_can_construct_http_header_dict(other)
  217. if maybe_constructable is None:
  218. return False
  219. else:
  220. other_as_http_header_dict = type(self)(maybe_constructable)
  221. return {k.lower(): v for k, v in self.itermerged()} == {
  222. k.lower(): v for k, v in other_as_http_header_dict.itermerged()
  223. }
  224. def __ne__(self, other: object) -> bool:
  225. return not self.__eq__(other)
  226. def __len__(self) -> int:
  227. return len(self._container)
  228. def __iter__(self) -> typing.Iterator[str]:
  229. # Only provide the originally cased names
  230. for vals in self._container.values():
  231. yield vals[0]
  232. def discard(self, key: str) -> None:
  233. try:
  234. del self[key]
  235. except KeyError:
  236. pass
  237. def add(self, key: str, val: str, *, combine: bool = False) -> None:
  238. """Adds a (name, value) pair, doesn't overwrite the value if it already
  239. exists.
  240. If this is called with combine=True, instead of adding a new header value
  241. as a distinct item during iteration, this will instead append the value to
  242. any existing header value with a comma. If no existing header value exists
  243. for the key, then the value will simply be added, ignoring the combine parameter.
  244. >>> headers = HTTPHeaderDict(foo='bar')
  245. >>> headers.add('Foo', 'baz')
  246. >>> headers['foo']
  247. 'bar, baz'
  248. >>> list(headers.items())
  249. [('foo', 'bar'), ('foo', 'baz')]
  250. >>> headers.add('foo', 'quz', combine=True)
  251. >>> list(headers.items())
  252. [('foo', 'bar, baz, quz')]
  253. """
  254. # avoid a bytes/str comparison by decoding before httplib
  255. if isinstance(key, bytes):
  256. key = key.decode("latin-1")
  257. key_lower = key.lower()
  258. new_vals = [key, val]
  259. # Keep the common case aka no item present as fast as possible
  260. vals = self._container.setdefault(key_lower, new_vals)
  261. if new_vals is not vals:
  262. # if there are values here, then there is at least the initial
  263. # key/value pair
  264. assert len(vals) >= 2
  265. if combine:
  266. vals[-1] = vals[-1] + ", " + val
  267. else:
  268. vals.append(val)
  269. def extend(self, *args: ValidHTTPHeaderSource, **kwargs: str) -> None:
  270. """Generic import function for any type of header-like object.
  271. Adapted version of MutableMapping.update in order to insert items
  272. with self.add instead of self.__setitem__
  273. """
  274. if len(args) > 1:
  275. raise TypeError(
  276. f"extend() takes at most 1 positional arguments ({len(args)} given)"
  277. )
  278. other = args[0] if len(args) >= 1 else ()
  279. if isinstance(other, HTTPHeaderDict):
  280. for key, val in other.iteritems():
  281. self.add(key, val)
  282. elif isinstance(other, typing.Mapping):
  283. for key, val in other.items():
  284. self.add(key, val)
  285. elif isinstance(other, typing.Iterable):
  286. other = typing.cast(typing.Iterable[typing.Tuple[str, str]], other)
  287. for key, value in other:
  288. self.add(key, value)
  289. elif hasattr(other, "keys") and hasattr(other, "__getitem__"):
  290. # THIS IS NOT A TYPESAFE BRANCH
  291. # In this branch, the object has a `keys` attr but is not a Mapping or any of
  292. # the other types indicated in the method signature. We do some stuff with
  293. # it as though it partially implements the Mapping interface, but we're not
  294. # doing that stuff safely AT ALL.
  295. for key in other.keys():
  296. self.add(key, other[key])
  297. for key, value in kwargs.items():
  298. self.add(key, value)
  299. @typing.overload
  300. def getlist(self, key: str) -> list[str]:
  301. ...
  302. @typing.overload
  303. def getlist(self, key: str, default: _DT) -> list[str] | _DT:
  304. ...
  305. def getlist(
  306. self, key: str, default: _Sentinel | _DT = _Sentinel.not_passed
  307. ) -> list[str] | _DT:
  308. """Returns a list of all the values for the named field. Returns an
  309. empty list if the key doesn't exist."""
  310. try:
  311. vals = self._container[key.lower()]
  312. except KeyError:
  313. if default is _Sentinel.not_passed:
  314. # _DT is unbound; empty list is instance of List[str]
  315. return []
  316. # _DT is bound; default is instance of _DT
  317. return default
  318. else:
  319. # _DT may or may not be bound; vals[1:] is instance of List[str], which
  320. # meets our external interface requirement of `Union[List[str], _DT]`.
  321. return vals[1:]
  322. # Backwards compatibility for httplib
  323. getheaders = getlist
  324. getallmatchingheaders = getlist
  325. iget = getlist
  326. # Backwards compatibility for http.cookiejar
  327. get_all = getlist
  328. def __repr__(self) -> str:
  329. return f"{type(self).__name__}({dict(self.itermerged())})"
  330. def _copy_from(self, other: HTTPHeaderDict) -> None:
  331. for key in other:
  332. val = other.getlist(key)
  333. self._container[key.lower()] = [key, *val]
  334. def copy(self) -> HTTPHeaderDict:
  335. clone = type(self)()
  336. clone._copy_from(self)
  337. return clone
  338. def iteritems(self) -> typing.Iterator[tuple[str, str]]:
  339. """Iterate over all header lines, including duplicate ones."""
  340. for key in self:
  341. vals = self._container[key.lower()]
  342. for val in vals[1:]:
  343. yield vals[0], val
  344. def itermerged(self) -> typing.Iterator[tuple[str, str]]:
  345. """Iterate over all headers, merging duplicate ones together."""
  346. for key in self:
  347. val = self._container[key.lower()]
  348. yield val[0], ", ".join(val[1:])
  349. def items(self) -> HTTPHeaderDictItemView: # type: ignore[override]
  350. return HTTPHeaderDictItemView(self)
  351. def _has_value_for_header(self, header_name: str, potential_value: str) -> bool:
  352. if header_name in self:
  353. return potential_value in self._container[header_name.lower()][1:]
  354. return False