123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- #
- # This file is part of pyasn1 software.
- #
- # Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
- # License: https://pyasn1.readthedocs.io/en/latest/license.html
- #
- from pyasn1 import error
- from pyasn1.codec.ber import encoder
- from pyasn1.compat.octets import str2octs, null
- from pyasn1.type import univ
- from pyasn1.type import useful
-
- __all__ = ['Encoder', 'encode']
-
-
- class BooleanEncoder(encoder.IntegerEncoder):
- def encodeValue(self, value, asn1Spec, encodeFun, **options):
- if value == 0:
- substrate = (0,)
- else:
- substrate = (255,)
- return substrate, False, False
-
-
- class RealEncoder(encoder.RealEncoder):
- def _chooseEncBase(self, value):
- m, b, e = value
- return self._dropFloatingPoint(m, b, e)
-
-
- # specialized GeneralStringEncoder here
-
- class TimeEncoderMixIn(object):
- Z_CHAR = ord('Z')
- PLUS_CHAR = ord('+')
- MINUS_CHAR = ord('-')
- COMMA_CHAR = ord(',')
- DOT_CHAR = ord('.')
- ZERO_CHAR = ord('0')
-
- MIN_LENGTH = 12
- MAX_LENGTH = 19
-
- def encodeValue(self, value, asn1Spec, encodeFun, **options):
- # CER encoding constraints:
- # - minutes are mandatory, seconds are optional
- # - sub-seconds must NOT be zero / no meaningless zeros
- # - no hanging fraction dot
- # - time in UTC (Z)
- # - only dot is allowed for fractions
-
- if asn1Spec is not None:
- value = asn1Spec.clone(value)
-
- numbers = value.asNumbers()
-
- if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers:
- raise error.PyAsn1Error('Must be UTC time: %r' % value)
-
- if numbers[-1] != self.Z_CHAR:
- raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value)
-
- if self.COMMA_CHAR in numbers:
- raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
-
- if self.DOT_CHAR in numbers:
-
- isModified = False
-
- numbers = list(numbers)
-
- searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1)
-
- while numbers[searchIndex] != self.DOT_CHAR:
- if numbers[searchIndex] == self.ZERO_CHAR:
- del numbers[searchIndex]
- isModified = True
-
- searchIndex -= 1
-
- searchIndex += 1
-
- if searchIndex < len(numbers):
- if numbers[searchIndex] == self.Z_CHAR:
- # drop hanging comma
- del numbers[searchIndex - 1]
- isModified = True
-
- if isModified:
- value = value.clone(numbers)
-
- if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH:
- raise error.PyAsn1Error('Length constraint violated: %r' % value)
-
- options.update(maxChunkSize=1000)
-
- return encoder.OctetStringEncoder.encodeValue(
- self, value, asn1Spec, encodeFun, **options
- )
-
-
- class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
- MIN_LENGTH = 12
- MAX_LENGTH = 20
-
-
- class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
- MIN_LENGTH = 10
- MAX_LENGTH = 14
-
-
- class SetOfEncoder(encoder.SequenceOfEncoder):
- def encodeValue(self, value, asn1Spec, encodeFun, **options):
- chunks = self._encodeComponents(
- value, asn1Spec, encodeFun, **options)
-
- # sort by serialised and padded components
- if len(chunks) > 1:
- zero = str2octs('\x00')
- maxLen = max(map(len, chunks))
- paddedChunks = [
- (x.ljust(maxLen, zero), x) for x in chunks
- ]
- paddedChunks.sort(key=lambda x: x[0])
-
- chunks = [x[1] for x in paddedChunks]
-
- return null.join(chunks), True, True
-
-
- class SequenceOfEncoder(encoder.SequenceOfEncoder):
- def encodeValue(self, value, asn1Spec, encodeFun, **options):
-
- if options.get('ifNotEmpty', False) and not len(value):
- return null, True, True
-
- chunks = self._encodeComponents(
- value, asn1Spec, encodeFun, **options)
-
- return null.join(chunks), True, True
-
-
- class SetEncoder(encoder.SequenceEncoder):
- @staticmethod
- def _componentSortKey(componentAndType):
- """Sort SET components by tag
-
- Sort regardless of the Choice value (static sort)
- """
- component, asn1Spec = componentAndType
-
- if asn1Spec is None:
- asn1Spec = component
-
- if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet:
- if asn1Spec.tagSet:
- return asn1Spec.tagSet
- else:
- return asn1Spec.componentType.minTagSet
- else:
- return asn1Spec.tagSet
-
- def encodeValue(self, value, asn1Spec, encodeFun, **options):
-
- substrate = null
-
- comps = []
- compsMap = {}
-
- if asn1Spec is None:
- # instance of ASN.1 schema
- inconsistency = value.isInconsistent
- if inconsistency:
- raise inconsistency
-
- namedTypes = value.componentType
-
- for idx, component in enumerate(value.values()):
- if namedTypes:
- namedType = namedTypes[idx]
-
- if namedType.isOptional and not component.isValue:
- continue
-
- if namedType.isDefaulted and component == namedType.asn1Object:
- continue
-
- compsMap[id(component)] = namedType
-
- else:
- compsMap[id(component)] = None
-
- comps.append((component, asn1Spec))
-
- else:
- # bare Python value + ASN.1 schema
- for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
-
- try:
- component = value[namedType.name]
-
- except KeyError:
- raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value))
-
- if namedType.isOptional and namedType.name not in value:
- continue
-
- if namedType.isDefaulted and component == namedType.asn1Object:
- continue
-
- compsMap[id(component)] = namedType
- comps.append((component, asn1Spec[idx]))
-
- for comp, compType in sorted(comps, key=self._componentSortKey):
- namedType = compsMap[id(comp)]
-
- if namedType:
- options.update(ifNotEmpty=namedType.isOptional)
-
- chunk = encodeFun(comp, compType, **options)
-
- # wrap open type blob if needed
- if namedType and namedType.openType:
- wrapType = namedType.asn1Object
- if wrapType.tagSet and not wrapType.isSameTypeWith(comp):
- chunk = encodeFun(chunk, wrapType, **options)
-
- substrate += chunk
-
- return substrate, True, True
-
-
- class SequenceEncoder(encoder.SequenceEncoder):
- omitEmptyOptionals = True
-
-
- TAG_MAP = encoder.TAG_MAP.copy()
-
- TAG_MAP.update({
- univ.Boolean.tagSet: BooleanEncoder(),
- univ.Real.tagSet: RealEncoder(),
- useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(),
- useful.UTCTime.tagSet: UTCTimeEncoder(),
- # Sequence & Set have same tags as SequenceOf & SetOf
- univ.SetOf.tagSet: SetOfEncoder(),
- univ.Sequence.typeId: SequenceEncoder()
- })
-
- TYPE_MAP = encoder.TYPE_MAP.copy()
-
- TYPE_MAP.update({
- univ.Boolean.typeId: BooleanEncoder(),
- univ.Real.typeId: RealEncoder(),
- useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(),
- useful.UTCTime.typeId: UTCTimeEncoder(),
- # Sequence & Set have same tags as SequenceOf & SetOf
- univ.Set.typeId: SetEncoder(),
- univ.SetOf.typeId: SetOfEncoder(),
- univ.Sequence.typeId: SequenceEncoder(),
- univ.SequenceOf.typeId: SequenceOfEncoder()
- })
-
- # deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9
- tagMap = TAG_MAP
- typeMap = TYPE_MAP
-
-
- class SingleItemEncoder(encoder.SingleItemEncoder):
- fixedDefLengthMode = False
- fixedChunkSize = 1000
-
- TAG_MAP = TAG_MAP
- TYPE_MAP = TYPE_MAP
-
-
- class Encoder(encoder.Encoder):
- SINGLE_ITEM_ENCODER = SingleItemEncoder
-
-
- #: Turns ASN.1 object into CER octet stream.
- #:
- #: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
- #: walks all its components recursively and produces a CER octet stream.
- #:
- #: Parameters
- #: ----------
- #: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
- #: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
- #: parameter is required to guide the encoding process.
- #:
- #: Keyword Args
- #: ------------
- #: asn1Spec:
- #: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
- #:
- #: Returns
- #: -------
- #: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
- #: Given ASN.1 object encoded into BER octet-stream
- #:
- #: Raises
- #: ------
- #: ~pyasn1.error.PyAsn1Error
- #: On encoding errors
- #:
- #: Examples
- #: --------
- #: Encode Python value into CER with ASN.1 schema
- #:
- #: .. code-block:: pycon
- #:
- #: >>> seq = SequenceOf(componentType=Integer())
- #: >>> encode([1, 2, 3], asn1Spec=seq)
- #: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
- #:
- #: Encode ASN.1 value object into CER
- #:
- #: .. code-block:: pycon
- #:
- #: >>> seq = SequenceOf(componentType=Integer())
- #: >>> seq.extend([1, 2, 3])
- #: >>> encode(seq)
- #: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
- #:
- encode = Encoder()
-
- # EncoderFactory queries class instance and builds a map of tags -> encoders
|