You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

decoder.py 7.5KB

5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. #
  2. # This file is part of pyasn1 software.
  3. #
  4. # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
  5. # License: http://snmplabs.com/pyasn1/license.html
  6. #
  7. from pyasn1 import debug
  8. from pyasn1 import error
  9. from pyasn1.type import base
  10. from pyasn1.type import char
  11. from pyasn1.type import tag
  12. from pyasn1.type import univ
  13. from pyasn1.type import useful
  14. __all__ = ['decode']
  15. LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
  16. class AbstractScalarDecoder(object):
  17. def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
  18. return asn1Spec.clone(pyObject)
  19. class BitStringDecoder(AbstractScalarDecoder):
  20. def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
  21. return asn1Spec.clone(univ.BitString.fromBinaryString(pyObject))
  22. class SequenceOrSetDecoder(object):
  23. def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
  24. asn1Value = asn1Spec.clone()
  25. componentsTypes = asn1Spec.componentType
  26. for field in asn1Value:
  27. if field in pyObject:
  28. asn1Value[field] = decodeFun(pyObject[field], componentsTypes[field].asn1Object, **options)
  29. return asn1Value
  30. class SequenceOfOrSetOfDecoder(object):
  31. def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
  32. asn1Value = asn1Spec.clone()
  33. for pyValue in pyObject:
  34. asn1Value.append(decodeFun(pyValue, asn1Spec.componentType), **options)
  35. return asn1Value
  36. class ChoiceDecoder(object):
  37. def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
  38. asn1Value = asn1Spec.clone()
  39. componentsTypes = asn1Spec.componentType
  40. for field in pyObject:
  41. if field in componentsTypes:
  42. asn1Value[field] = decodeFun(pyObject[field], componentsTypes[field].asn1Object, **options)
  43. break
  44. return asn1Value
  45. tagMap = {
  46. univ.Integer.tagSet: AbstractScalarDecoder(),
  47. univ.Boolean.tagSet: AbstractScalarDecoder(),
  48. univ.BitString.tagSet: BitStringDecoder(),
  49. univ.OctetString.tagSet: AbstractScalarDecoder(),
  50. univ.Null.tagSet: AbstractScalarDecoder(),
  51. univ.ObjectIdentifier.tagSet: AbstractScalarDecoder(),
  52. univ.Enumerated.tagSet: AbstractScalarDecoder(),
  53. univ.Real.tagSet: AbstractScalarDecoder(),
  54. univ.Sequence.tagSet: SequenceOrSetDecoder(), # conflicts with SequenceOf
  55. univ.Set.tagSet: SequenceOrSetDecoder(), # conflicts with SetOf
  56. univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any
  57. # character string types
  58. char.UTF8String.tagSet: AbstractScalarDecoder(),
  59. char.NumericString.tagSet: AbstractScalarDecoder(),
  60. char.PrintableString.tagSet: AbstractScalarDecoder(),
  61. char.TeletexString.tagSet: AbstractScalarDecoder(),
  62. char.VideotexString.tagSet: AbstractScalarDecoder(),
  63. char.IA5String.tagSet: AbstractScalarDecoder(),
  64. char.GraphicString.tagSet: AbstractScalarDecoder(),
  65. char.VisibleString.tagSet: AbstractScalarDecoder(),
  66. char.GeneralString.tagSet: AbstractScalarDecoder(),
  67. char.UniversalString.tagSet: AbstractScalarDecoder(),
  68. char.BMPString.tagSet: AbstractScalarDecoder(),
  69. # useful types
  70. useful.ObjectDescriptor.tagSet: AbstractScalarDecoder(),
  71. useful.GeneralizedTime.tagSet: AbstractScalarDecoder(),
  72. useful.UTCTime.tagSet: AbstractScalarDecoder()
  73. }
  74. # Put in ambiguous & non-ambiguous types for faster codec lookup
  75. typeMap = {
  76. univ.Integer.typeId: AbstractScalarDecoder(),
  77. univ.Boolean.typeId: AbstractScalarDecoder(),
  78. univ.BitString.typeId: BitStringDecoder(),
  79. univ.OctetString.typeId: AbstractScalarDecoder(),
  80. univ.Null.typeId: AbstractScalarDecoder(),
  81. univ.ObjectIdentifier.typeId: AbstractScalarDecoder(),
  82. univ.Enumerated.typeId: AbstractScalarDecoder(),
  83. univ.Real.typeId: AbstractScalarDecoder(),
  84. # ambiguous base types
  85. univ.Set.typeId: SequenceOrSetDecoder(),
  86. univ.SetOf.typeId: SequenceOfOrSetOfDecoder(),
  87. univ.Sequence.typeId: SequenceOrSetDecoder(),
  88. univ.SequenceOf.typeId: SequenceOfOrSetOfDecoder(),
  89. univ.Choice.typeId: ChoiceDecoder(),
  90. univ.Any.typeId: AbstractScalarDecoder(),
  91. # character string types
  92. char.UTF8String.typeId: AbstractScalarDecoder(),
  93. char.NumericString.typeId: AbstractScalarDecoder(),
  94. char.PrintableString.typeId: AbstractScalarDecoder(),
  95. char.TeletexString.typeId: AbstractScalarDecoder(),
  96. char.VideotexString.typeId: AbstractScalarDecoder(),
  97. char.IA5String.typeId: AbstractScalarDecoder(),
  98. char.GraphicString.typeId: AbstractScalarDecoder(),
  99. char.VisibleString.typeId: AbstractScalarDecoder(),
  100. char.GeneralString.typeId: AbstractScalarDecoder(),
  101. char.UniversalString.typeId: AbstractScalarDecoder(),
  102. char.BMPString.typeId: AbstractScalarDecoder(),
  103. # useful types
  104. useful.ObjectDescriptor.typeId: AbstractScalarDecoder(),
  105. useful.GeneralizedTime.typeId: AbstractScalarDecoder(),
  106. useful.UTCTime.typeId: AbstractScalarDecoder()
  107. }
  108. class Decoder(object):
  109. # noinspection PyDefaultArgument
  110. def __init__(self, tagMap, typeMap):
  111. self.__tagMap = tagMap
  112. self.__typeMap = typeMap
  113. def __call__(self, pyObject, asn1Spec, **options):
  114. if LOG:
  115. debug.scope.push(type(pyObject).__name__)
  116. LOG('decoder called at scope %s, working with type %s' % (debug.scope, type(pyObject).__name__))
  117. if asn1Spec is None or not isinstance(asn1Spec, base.Asn1Item):
  118. raise error.PyAsn1Error('asn1Spec is not valid (should be an instance of an ASN.1 Item, not %s)' % asn1Spec.__class__.__name__)
  119. try:
  120. valueDecoder = self.__typeMap[asn1Spec.typeId]
  121. except KeyError:
  122. # use base type for codec lookup to recover untagged types
  123. baseTagSet = tag.TagSet(asn1Spec.tagSet.baseTag, asn1Spec.tagSet.baseTag)
  124. try:
  125. valueDecoder = self.__tagMap[baseTagSet]
  126. except KeyError:
  127. raise error.PyAsn1Error('Unknown ASN.1 tag %s' % asn1Spec.tagSet)
  128. if LOG:
  129. LOG('calling decoder %s on Python type %s <%s>' % (type(valueDecoder).__name__, type(pyObject).__name__, repr(pyObject)))
  130. value = valueDecoder(pyObject, asn1Spec, self, **options)
  131. if LOG:
  132. LOG('decoder %s produced ASN.1 type %s <%s>' % (type(valueDecoder).__name__, type(value).__name__, repr(value)))
  133. debug.scope.pop()
  134. return value
  135. #: Turns Python objects of built-in types into ASN.1 objects.
  136. #:
  137. #: Takes Python objects of built-in types and turns them into a tree of
  138. #: ASN.1 objects (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
  139. #: may be a scalar or an arbitrary nested structure.
  140. #:
  141. #: Parameters
  142. #: ----------
  143. #: pyObject: :py:class:`object`
  144. #: A scalar or nested Python objects
  145. #:
  146. #: Keyword Args
  147. #: ------------
  148. #: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
  149. #: A pyasn1 type object to act as a template guiding the decoder. It is required
  150. #: for successful interpretation of Python objects mapping into their ASN.1
  151. #: representations.
  152. #:
  153. #: Returns
  154. #: -------
  155. #: : :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
  156. #: A scalar or constructed pyasn1 object
  157. #:
  158. #: Raises
  159. #: ------
  160. #: ~pyasn1.error.PyAsn1Error
  161. #: On decoding errors
  162. #:
  163. #: Examples
  164. #: --------
  165. #: Decode native Python object into ASN.1 objects with ASN.1 schema
  166. #:
  167. #: .. code-block:: pycon
  168. #:
  169. #: >>> seq = SequenceOf(componentType=Integer())
  170. #: >>> s, _ = decode([1, 2, 3], asn1Spec=seq)
  171. #: >>> str(s)
  172. #: SequenceOf:
  173. #: 1 2 3
  174. #:
  175. decode = Decoder(tagMap, typeMap)