224 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2016 Anki, Inc.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License in the file LICENSE.txt or at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| '''
 | |
| Animation related classes, functions, events and values.
 | |
| '''
 | |
| 
 | |
| # __all__ should order by constants, event classes, other classes, functions.
 | |
| __all__ = ['EvtAnimationsLoaded', 'EvtAnimationCompleted',
 | |
|            'Animation', 'AnimationTrigger', 'AnimationNames', 'Triggers',
 | |
|            'animation_completed_filter']
 | |
| 
 | |
| import collections
 | |
| 
 | |
| from . import logger
 | |
| 
 | |
| from . import action
 | |
| from . import exceptions
 | |
| from . import event
 | |
| 
 | |
| from ._clad import _clad_to_engine_iface, _clad_to_engine_cozmo
 | |
| 
 | |
| 
 | |
| class EvtAnimationsLoaded(event.Event):
 | |
|     '''Triggered when animations names have been received from the engine'''
 | |
| 
 | |
| 
 | |
| class EvtAnimationCompleted(action.EvtActionCompleted):
 | |
|     '''Triggered when an animation completes.'''
 | |
|     animation_name = "The name of the animation or trigger that completed"
 | |
| 
 | |
| 
 | |
| class Animation(action.Action):
 | |
|     '''An Animation describes an actively-playing animation on a robot.'''
 | |
|     def __init__(self, anim_name, loop_count, ignore_body_track=False,
 | |
|                  ignore_head_track=False, ignore_lift_track=False, **kw):
 | |
|         super().__init__(**kw)
 | |
| 
 | |
|         #: The name of the animation that was dispatched
 | |
|         self.anim_name = anim_name
 | |
| 
 | |
|         #: The number of iterations the animation was requested for
 | |
|         self.loop_count = loop_count
 | |
| 
 | |
|         #: bool: True to ignore the body track (i.e. the wheels / treads)
 | |
|         self.ignore_body_track = ignore_body_track
 | |
| 
 | |
|         #: bool: True to ignore the head track
 | |
|         self.ignore_head_track = ignore_head_track
 | |
| 
 | |
|         #: bool: True to ignore the lift track
 | |
|         self.ignore_lift_track = ignore_lift_track
 | |
| 
 | |
| 
 | |
|     def _repr_values(self):
 | |
|         all_tracks = {"body":self.ignore_body_track,
 | |
|                       "head":self.ignore_head_track,
 | |
|                       "lift":self.ignore_lift_track}
 | |
|         ignore_tracks = [k for k, v in all_tracks.items() if v]
 | |
| 
 | |
|         return "anim_name=%s loop_count=%s ignore_tracks=%s" % (self.anim_name, self.loop_count, str(ignore_tracks))
 | |
| 
 | |
|     def _encode(self):
 | |
|         return _clad_to_engine_iface.PlayAnimation(animationName=self.anim_name, numLoops=self.loop_count, ignoreBodyTrack=self.ignore_body_track,
 | |
|             ignoreHeadTrack=self.ignore_head_track, ignoreLiftTrack=self.ignore_lift_track)
 | |
| 
 | |
|     def _dispatch_completed_event(self, msg):
 | |
|         self._completed_event = EvtAnimationCompleted(
 | |
|                 action=self, state=self._state,
 | |
|                 animation_name=self.anim_name)
 | |
|         self.dispatch_event(self._completed_event)
 | |
| 
 | |
| 
 | |
| class AnimationTrigger(action.Action):
 | |
|     '''An AnimationTrigger represents a playing animation trigger.
 | |
| 
 | |
|     Asking Cozmo to play an AnimationTrigger causes him to pick one of the
 | |
|     animations represented by the group.
 | |
|     '''
 | |
|     def __init__(self, trigger, loop_count, use_lift_safe, ignore_body_track,
 | |
|                  ignore_head_track, ignore_lift_track, **kw):
 | |
|         super().__init__(**kw)
 | |
| 
 | |
|         #: An attribute of :class:`cozmo.anim.Triggers`: The animation trigger dispatched.
 | |
|         self.trigger = trigger
 | |
| 
 | |
|         #: int: The number of iterations the animation was requested for
 | |
|         self.loop_count = loop_count
 | |
| 
 | |
|         #: bool: True to automatically ignore the lift track if Cozmo is carrying a cube.
 | |
|         self.use_lift_safe = use_lift_safe
 | |
| 
 | |
|         #: bool: True to ignore the body track (i.e. the wheels / treads)
 | |
|         self.ignore_body_track = ignore_body_track
 | |
| 
 | |
|         #: bool: True to ignore the head track
 | |
|         self.ignore_head_track = ignore_head_track
 | |
| 
 | |
|         #: bool: True to ignore the lift track
 | |
|         self.ignore_lift_track = ignore_lift_track
 | |
| 
 | |
|     def _repr_values(self):
 | |
|         all_tracks = {"body":self.ignore_body_track,
 | |
|                       "head":self.ignore_head_track,
 | |
|                       "lift":self.ignore_lift_track}
 | |
|         ignore_tracks = [k for k, v in all_tracks.items() if v]
 | |
| 
 | |
|         return "trigger=%s loop_count=%s ignore_tracks=%s use_lift_safe=%s" % (
 | |
|             self.trigger.name, self.loop_count, str(ignore_tracks), self.use_lift_safe)
 | |
| 
 | |
|     def _encode(self):
 | |
|         return _clad_to_engine_iface.PlayAnimationTrigger(
 | |
|             trigger=self.trigger.id, numLoops=self.loop_count,
 | |
|             useLiftSafe=self.use_lift_safe, ignoreBodyTrack=self.ignore_body_track,
 | |
|             ignoreHeadTrack=self.ignore_head_track, ignoreLiftTrack=self.ignore_lift_track)
 | |
| 
 | |
|     def _dispatch_completed_event(self, msg):
 | |
|         self._completed_event = EvtAnimationCompleted(
 | |
|                 action=self, state=self._state,
 | |
|                 animation_name=self.trigger.name)
 | |
|         self.dispatch_event(self._completed_event)
 | |
| 
 | |
| 
 | |
| class AnimationNames(event.Dispatcher, set):
 | |
|     '''Holds the set of animation names (strings) returned from the Engine.
 | |
| 
 | |
|     Animation names are dynamically retrieved from the engine when the SDK
 | |
|     connects to it, unlike :class:`Triggers` which are defined at runtime.
 | |
|     '''
 | |
|     def __init__(self, conn, **kw):
 | |
|         super().__init__(self, **kw)
 | |
|         self._conn = conn
 | |
|         self._loaded = False
 | |
| 
 | |
|     def __contains__(self, key):
 | |
|         if not self._loaded:
 | |
|             raise exceptions.AnimationsNotLoaded("Animations not yet received from engine")
 | |
|         return super().__contains__(key)
 | |
| 
 | |
|     def __hash__(self):
 | |
|         # We want to compare AnimationName instances rather than the
 | |
|         # names they contain
 | |
|         return id(self)
 | |
| 
 | |
|     def refresh(self):
 | |
|         '''Causes the list of animation names to be re-requested from the engine.
 | |
| 
 | |
|         Attempting to play an animation while the list is refreshing will result
 | |
|         in an AnimationsNotLoaded exception being raised.
 | |
| 
 | |
|         Generates an EvtAnimationsLoaded event once completed.
 | |
|         '''
 | |
|         self._loaded = False
 | |
|         self.clear()
 | |
|         self._conn.send_msg(_clad_to_engine_iface.RequestAvailableAnimations())
 | |
| 
 | |
|     @property
 | |
|     def is_loaded(self):
 | |
|         '''bool: True if the animation names have been received from the engine.'''
 | |
|         return self._loaded != False
 | |
| 
 | |
|     async def wait_for_loaded(self, timeout=None):
 | |
|         '''Wait for the animation names to be loaded from the engine.
 | |
| 
 | |
|         Returns:
 | |
|                 The :class:`EvtAnimationsLoaded` instance once loaded
 | |
|         Raises:
 | |
|             :class:`asyncio.TimeoutError`
 | |
|         '''
 | |
|         if self._loaded:
 | |
|             return self._loaded
 | |
|         return await self.wait_for(EvtAnimationsLoaded, timeout=timeout)
 | |
| 
 | |
|     def _recv_msg_animation_available(self, evt, msg):
 | |
|         name = msg.animName
 | |
|         self.add(name)
 | |
| 
 | |
|     def _recv_msg_end_of_message(self, evt, msg):
 | |
|         if not self._loaded:
 | |
|             logger.debug("%d animations loaded", len(self))
 | |
|             self._loaded = evt
 | |
|             self.dispatch_event(EvtAnimationsLoaded)
 | |
| 
 | |
| 
 | |
| # generate names for each CLAD defined trigger
 | |
| 
 | |
| _AnimTrigger = collections.namedtuple('_AnimTrigger', 'name id')
 | |
| class Triggers:
 | |
|     """Playing an animation trigger causes the game engine play an animation of a particular type.
 | |
| 
 | |
|     The engine may pick one of a number of actual animations to play based on
 | |
|     Cozmo's mood or emotion, or with random weighting.  Thus playing the same
 | |
|     trigger twice may not result in the exact same underlying animation playing
 | |
|     twice.
 | |
| 
 | |
|     To play an exact animation, use play_anim with a named animation.
 | |
| 
 | |
|     This class holds the set of defined animations triggers to pass to play_anim_trigger.
 | |
|     """
 | |
|     trigger_list = []
 | |
| 
 | |
| for (_name, _id) in _clad_to_engine_cozmo.AnimationTrigger.__dict__.items():
 | |
|     if not _name.startswith('_'):
 | |
|         trigger = _AnimTrigger(_name, _id)
 | |
|         setattr(Triggers, _name, trigger)
 | |
|         Triggers.trigger_list.append(trigger)
 | |
| 
 | |
| 
 | |
| def animation_completed_filter():
 | |
|     '''Creates an :class:`cozmo.event.Filter` to wait specifically for an animation completed event.'''
 | |
|     return event.Filter(action.EvtActionCompleted,
 | |
|         action=lambda action: isinstance(action, Animation))
 | 
