Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_methodical.py 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. # -*- test-case-name: automat._test.test_methodical -*-
  2. import collections
  3. from functools import wraps
  4. from itertools import count
  5. from inspect import getfullargspec as getArgsSpec
  6. import attr
  7. from ._core import Transitioner, Automaton
  8. from ._introspection import preserveName
  9. ArgSpec = collections.namedtuple('ArgSpec', ['args', 'varargs', 'varkw',
  10. 'defaults', 'kwonlyargs',
  11. 'kwonlydefaults', 'annotations'])
  12. def _getArgSpec(func):
  13. """
  14. Normalize inspect.ArgSpec across python versions
  15. and convert mutable attributes to immutable types.
  16. :param Callable func: A function.
  17. :return: The function's ArgSpec.
  18. :rtype: ArgSpec
  19. """
  20. spec = getArgsSpec(func)
  21. return ArgSpec(
  22. args=tuple(spec.args),
  23. varargs=spec.varargs,
  24. varkw=spec.varkw,
  25. defaults=spec.defaults if spec.defaults else (),
  26. kwonlyargs=tuple(spec.kwonlyargs),
  27. kwonlydefaults=(
  28. tuple(spec.kwonlydefaults.items())
  29. if spec.kwonlydefaults else ()
  30. ),
  31. annotations=tuple(spec.annotations.items()),
  32. )
  33. def _getArgNames(spec):
  34. """
  35. Get the name of all arguments defined in a function signature.
  36. The name of * and ** arguments is normalized to "*args" and "**kwargs".
  37. :param ArgSpec spec: A function to interrogate for a signature.
  38. :return: The set of all argument names in `func`s signature.
  39. :rtype: Set[str]
  40. """
  41. return set(
  42. spec.args
  43. + spec.kwonlyargs
  44. + (('*args',) if spec.varargs else ())
  45. + (('**kwargs',) if spec.varkw else ())
  46. + spec.annotations
  47. )
  48. def _keywords_only(f):
  49. """
  50. Decorate a function so all its arguments must be passed by keyword.
  51. A useful utility for decorators that take arguments so that they don't
  52. accidentally get passed the thing they're decorating as their first
  53. argument.
  54. Only works for methods right now.
  55. """
  56. @wraps(f)
  57. def g(self, **kw):
  58. return f(self, **kw)
  59. return g
  60. @attr.s(frozen=True)
  61. class MethodicalState(object):
  62. """
  63. A state for a L{MethodicalMachine}.
  64. """
  65. machine = attr.ib(repr=False)
  66. method = attr.ib()
  67. serialized = attr.ib(repr=False)
  68. def upon(self, input, enter=None, outputs=None, collector=list):
  69. """
  70. Declare a state transition within the :class:`automat.MethodicalMachine`
  71. associated with this :class:`automat.MethodicalState`:
  72. upon the receipt of the `input`, enter the `state`,
  73. emitting each output in `outputs`.
  74. :param MethodicalInput input: The input triggering a state transition.
  75. :param MethodicalState enter: The resulting state.
  76. :param Iterable[MethodicalOutput] outputs: The outputs to be triggered
  77. as a result of the declared state transition.
  78. :param Callable collector: The function to be used when collecting
  79. output return values.
  80. :raises TypeError: if any of the `outputs` signatures do not match
  81. the `inputs` signature.
  82. :raises ValueError: if the state transition from `self` via `input`
  83. has already been defined.
  84. """
  85. if enter is None:
  86. enter = self
  87. if outputs is None:
  88. outputs = []
  89. inputArgs = _getArgNames(input.argSpec)
  90. for output in outputs:
  91. outputArgs = _getArgNames(output.argSpec)
  92. if not outputArgs.issubset(inputArgs):
  93. raise TypeError(
  94. "method {input} signature {inputSignature} "
  95. "does not match output {output} "
  96. "signature {outputSignature}".format(
  97. input=input.method.__name__,
  98. output=output.method.__name__,
  99. inputSignature=getArgsSpec(input.method),
  100. outputSignature=getArgsSpec(output.method),
  101. ))
  102. self.machine._oneTransition(self, input, enter, outputs, collector)
  103. def _name(self):
  104. return self.method.__name__
  105. def _transitionerFromInstance(oself, symbol, automaton):
  106. """
  107. Get a L{Transitioner}
  108. """
  109. transitioner = getattr(oself, symbol, None)
  110. if transitioner is None:
  111. transitioner = Transitioner(
  112. automaton,
  113. automaton.initialState,
  114. )
  115. setattr(oself, symbol, transitioner)
  116. return transitioner
  117. def _empty():
  118. pass
  119. def _docstring():
  120. """docstring"""
  121. def assertNoCode(inst, attribute, f):
  122. # The function body must be empty, i.e. "pass" or "return None", which
  123. # both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also
  124. # accept functions with only a docstring, which yields slightly different
  125. # bytecode, because the "None" is put in a different constant slot.
  126. # Unfortunately, this does not catch function bodies that return a
  127. # constant value, e.g. "return 1", because their code is identical to a
  128. # "return None". They differ in the contents of their constant table, but
  129. # checking that would require us to parse the bytecode, find the index
  130. # being returned, then making sure the table has a None at that index.
  131. if f.__code__.co_code not in (_empty.__code__.co_code,
  132. _docstring.__code__.co_code):
  133. raise ValueError("function body must be empty")
  134. def _filterArgs(args, kwargs, inputSpec, outputSpec):
  135. """
  136. Filter out arguments that were passed to input that output won't accept.
  137. :param tuple args: The *args that input received.
  138. :param dict kwargs: The **kwargs that input received.
  139. :param ArgSpec inputSpec: The input's arg spec.
  140. :param ArgSpec outputSpec: The output's arg spec.
  141. :return: The args and kwargs that output will accept.
  142. :rtype: Tuple[tuple, dict]
  143. """
  144. named_args = tuple(zip(inputSpec.args[1:], args))
  145. if outputSpec.varargs:
  146. # Only return all args if the output accepts *args.
  147. return_args = args
  148. else:
  149. # Filter out arguments that don't appear
  150. # in the output's method signature.
  151. return_args = [v for n, v in named_args if n in outputSpec.args]
  152. # Get any of input's default arguments that were not passed.
  153. passed_arg_names = tuple(kwargs)
  154. for name, value in named_args:
  155. passed_arg_names += (name, value)
  156. defaults = zip(inputSpec.args[::-1], inputSpec.defaults[::-1])
  157. full_kwargs = {n: v for n, v in defaults if n not in passed_arg_names}
  158. full_kwargs.update(kwargs)
  159. if outputSpec.varkw:
  160. # Only pass all kwargs if the output method accepts **kwargs.
  161. return_kwargs = full_kwargs
  162. else:
  163. # Filter out names that the output method does not accept.
  164. all_accepted_names = outputSpec.args[1:] + outputSpec.kwonlyargs
  165. return_kwargs = {n: v for n, v in full_kwargs.items()
  166. if n in all_accepted_names}
  167. return return_args, return_kwargs
  168. @attr.s(eq=False, hash=False)
  169. class MethodicalInput(object):
  170. """
  171. An input for a L{MethodicalMachine}.
  172. """
  173. automaton = attr.ib(repr=False)
  174. method = attr.ib(validator=assertNoCode)
  175. symbol = attr.ib(repr=False)
  176. collectors = attr.ib(default=attr.Factory(dict), repr=False)
  177. argSpec = attr.ib(init=False, repr=False)
  178. @argSpec.default
  179. def _buildArgSpec(self):
  180. return _getArgSpec(self.method)
  181. def __get__(self, oself, type=None):
  182. """
  183. Return a function that takes no arguments and returns values returned
  184. by output functions produced by the given L{MethodicalInput} in
  185. C{oself}'s current state.
  186. """
  187. transitioner = _transitionerFromInstance(oself, self.symbol,
  188. self.automaton)
  189. @preserveName(self.method)
  190. @wraps(self.method)
  191. def doInput(*args, **kwargs):
  192. self.method(oself, *args, **kwargs)
  193. previousState = transitioner._state
  194. (outputs, outTracer) = transitioner.transition(self)
  195. collector = self.collectors[previousState]
  196. values = []
  197. for output in outputs:
  198. if outTracer:
  199. outTracer(output._name())
  200. a, k = _filterArgs(args, kwargs, self.argSpec, output.argSpec)
  201. value = output(oself, *a, **k)
  202. values.append(value)
  203. return collector(values)
  204. return doInput
  205. def _name(self):
  206. return self.method.__name__
  207. @attr.s(frozen=True)
  208. class MethodicalOutput(object):
  209. """
  210. An output for a L{MethodicalMachine}.
  211. """
  212. machine = attr.ib(repr=False)
  213. method = attr.ib()
  214. argSpec = attr.ib(init=False, repr=False)
  215. @argSpec.default
  216. def _buildArgSpec(self):
  217. return _getArgSpec(self.method)
  218. def __get__(self, oself, type=None):
  219. """
  220. Outputs are private, so raise an exception when we attempt to get one.
  221. """
  222. raise AttributeError(
  223. "{cls}.{method} is a state-machine output method; "
  224. "to produce this output, call an input method instead.".format(
  225. cls=type.__name__,
  226. method=self.method.__name__
  227. )
  228. )
  229. def __call__(self, oself, *args, **kwargs):
  230. """
  231. Call the underlying method.
  232. """
  233. return self.method(oself, *args, **kwargs)
  234. def _name(self):
  235. return self.method.__name__
  236. @attr.s(eq=False, hash=False)
  237. class MethodicalTracer(object):
  238. automaton = attr.ib(repr=False)
  239. symbol = attr.ib(repr=False)
  240. def __get__(self, oself, type=None):
  241. transitioner = _transitionerFromInstance(oself, self.symbol,
  242. self.automaton)
  243. def setTrace(tracer):
  244. transitioner.setTrace(tracer)
  245. return setTrace
  246. counter = count()
  247. def gensym():
  248. """
  249. Create a unique Python identifier.
  250. """
  251. return "_symbol_" + str(next(counter))
  252. class MethodicalMachine(object):
  253. """
  254. A :class:`MethodicalMachine` is an interface to an `Automaton`
  255. that uses methods on a class.
  256. """
  257. def __init__(self):
  258. self._automaton = Automaton()
  259. self._reducers = {}
  260. self._symbol = gensym()
  261. def __get__(self, oself, type=None):
  262. """
  263. L{MethodicalMachine} is an implementation detail for setting up
  264. class-level state; applications should never need to access it on an
  265. instance.
  266. """
  267. if oself is not None:
  268. raise AttributeError(
  269. "MethodicalMachine is an implementation detail.")
  270. return self
  271. @_keywords_only
  272. def state(self, initial=False, terminal=False,
  273. serialized=None):
  274. """
  275. Declare a state, possibly an initial state or a terminal state.
  276. This is a decorator for methods, but it will modify the method so as
  277. not to be callable any more.
  278. :param bool initial: is this state the initial state?
  279. Only one state on this :class:`automat.MethodicalMachine`
  280. may be an initial state; more than one is an error.
  281. :param bool terminal: Is this state a terminal state?
  282. i.e. a state that the machine can end up in?
  283. (This is purely informational at this point.)
  284. :param Hashable serialized: a serializable value
  285. to be used to represent this state to external systems.
  286. This value should be hashable;
  287. :py:func:`unicode` is a good type to use.
  288. """
  289. def decorator(stateMethod):
  290. state = MethodicalState(machine=self,
  291. method=stateMethod,
  292. serialized=serialized)
  293. if initial:
  294. self._automaton.initialState = state
  295. return state
  296. return decorator
  297. @_keywords_only
  298. def input(self):
  299. """
  300. Declare an input.
  301. This is a decorator for methods.
  302. """
  303. def decorator(inputMethod):
  304. return MethodicalInput(automaton=self._automaton,
  305. method=inputMethod,
  306. symbol=self._symbol)
  307. return decorator
  308. @_keywords_only
  309. def output(self):
  310. """
  311. Declare an output.
  312. This is a decorator for methods.
  313. This method will be called when the state machine transitions to this
  314. state as specified in the decorated `output` method.
  315. """
  316. def decorator(outputMethod):
  317. return MethodicalOutput(machine=self, method=outputMethod)
  318. return decorator
  319. def _oneTransition(self, startState, inputToken, endState, outputTokens,
  320. collector):
  321. """
  322. See L{MethodicalState.upon}.
  323. """
  324. # FIXME: tests for all of this (some of it is wrong)
  325. # if not isinstance(startState, MethodicalState):
  326. # raise NotImplementedError("start state {} isn't a state"
  327. # .format(startState))
  328. # if not isinstance(inputToken, MethodicalInput):
  329. # raise NotImplementedError("start state {} isn't an input"
  330. # .format(inputToken))
  331. # if not isinstance(endState, MethodicalState):
  332. # raise NotImplementedError("end state {} isn't a state"
  333. # .format(startState))
  334. # for output in outputTokens:
  335. # if not isinstance(endState, MethodicalState):
  336. # raise NotImplementedError("output state {} isn't a state"
  337. # .format(endState))
  338. self._automaton.addTransition(startState, inputToken, endState,
  339. tuple(outputTokens))
  340. inputToken.collectors[startState] = collector
  341. @_keywords_only
  342. def serializer(self):
  343. """
  344. """
  345. def decorator(decoratee):
  346. @wraps(decoratee)
  347. def serialize(oself):
  348. transitioner = _transitionerFromInstance(oself, self._symbol,
  349. self._automaton)
  350. return decoratee(oself, transitioner._state.serialized)
  351. return serialize
  352. return decorator
  353. @_keywords_only
  354. def unserializer(self):
  355. """
  356. """
  357. def decorator(decoratee):
  358. @wraps(decoratee)
  359. def unserialize(oself, *args, **kwargs):
  360. state = decoratee(oself, *args, **kwargs)
  361. mapping = {}
  362. for eachState in self._automaton.states():
  363. mapping[eachState.serialized] = eachState
  364. transitioner = _transitionerFromInstance(
  365. oself, self._symbol, self._automaton)
  366. transitioner._state = mapping[state]
  367. return None # it's on purpose
  368. return unserialize
  369. return decorator
  370. @property
  371. def _setTrace(self):
  372. return MethodicalTracer(self._automaton, self._symbol)
  373. def asDigraph(self):
  374. """
  375. Generate a L{graphviz.Digraph} that represents this machine's
  376. states and transitions.
  377. @return: L{graphviz.Digraph} object; for more information, please
  378. see the documentation for
  379. U{graphviz<https://graphviz.readthedocs.io/>}
  380. """
  381. from ._visualize import makeDigraph
  382. return makeDigraph(
  383. self._automaton,
  384. stateAsString=lambda state: state.method.__name__,
  385. inputAsString=lambda input: input.method.__name__,
  386. outputAsString=lambda output: output.method.__name__,
  387. )