Development of an internal social media platform with personalised dashboards for students
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ldif3.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """ldif3 - generate and parse LDIF data (see RFC 2849)."""
  2. from __future__ import unicode_literals
  3. __version__ = '3.1.1'
  4. __all__ = [
  5. # constants
  6. 'LDIF_PATTERN',
  7. # classes
  8. 'LDIFWriter',
  9. 'LDIFParser',
  10. ]
  11. import base64
  12. import re
  13. import logging
  14. from collections import OrderedDict
  15. try: # pragma: nocover
  16. from urlparse import urlparse
  17. from urllib import urlopen
  18. except ImportError: # pragma: nocover
  19. from urllib.parse import urlparse
  20. from urllib.request import urlopen
  21. log = logging.getLogger('ldif3')
  22. ATTRTYPE_PATTERN = r'[\w;.-]+(;[\w_-]+)*'
  23. ATTRVALUE_PATTERN = r'(([^,]|\\,)+|".*?")'
  24. ATTR_PATTERN = ATTRTYPE_PATTERN + r'[ ]*=[ ]*' + ATTRVALUE_PATTERN
  25. RDN_PATTERN = ATTR_PATTERN + r'([ ]*\+[ ]*' + ATTR_PATTERN + r')*[ ]*'
  26. DN_PATTERN = RDN_PATTERN + r'([ ]*,[ ]*' + RDN_PATTERN + r')*[ ]*'
  27. DN_REGEX = re.compile('^%s$' % DN_PATTERN)
  28. LDIF_PATTERN = ('^((dn(:|::) %(DN_PATTERN)s)|(%(ATTRTYPE_PATTERN)'
  29. 's(:|::) .*)$)+' % vars())
  30. MOD_OPS = ['add', 'delete', 'replace']
  31. CHANGE_TYPES = ['add', 'delete', 'modify', 'modrdn']
  32. def is_dn(s):
  33. """Return True if s is a LDAP DN."""
  34. if s == '':
  35. return True
  36. rm = DN_REGEX.match(s)
  37. return rm is not None and rm.group(0) == s
  38. UNSAFE_STRING_PATTERN = '(^[ :<]|[\000\n\r\200-\377])'
  39. UNSAFE_STRING_RE = re.compile(UNSAFE_STRING_PATTERN)
  40. def lower(l):
  41. """Return a list with the lowercased items of l."""
  42. return [i.lower() for i in l or []]
  43. class LDIFWriter(object):
  44. """Write LDIF entry or change records to file object.
  45. :type output_file: file-like object in binary mode
  46. :param output_file: File for output
  47. :type base64_attrs: List[string]
  48. :param base64_attrs: List of attribute types to be base64-encoded in any
  49. case
  50. :type cols: int
  51. :param cols: Specifies how many columns a line may have before it is
  52. folded into many lines
  53. :type line_sep: bytearray
  54. :param line_sep: line separator
  55. """
  56. def __init__(
  57. self, output_file, base64_attrs=[], cols=76, line_sep=b'\n'):
  58. self._output_file = output_file
  59. self._base64_attrs = lower(base64_attrs)
  60. self._cols = cols
  61. self._line_sep = line_sep
  62. self.records_written = 0 #: number of records that have been written
  63. def _fold_line(self, line):
  64. """Write string line as one or more folded lines."""
  65. if len(line) <= self._cols:
  66. self._output_file.write(line)
  67. self._output_file.write(self._line_sep)
  68. else:
  69. pos = self._cols
  70. self._output_file.write(line[0:self._cols])
  71. self._output_file.write(self._line_sep)
  72. while pos < len(line):
  73. self._output_file.write(b' ')
  74. end = min(len(line), pos + self._cols - 1)
  75. self._output_file.write(line[pos:end])
  76. self._output_file.write(self._line_sep)
  77. pos = end
  78. def _needs_base64_encoding(self, attr_type, attr_value):
  79. """Return True if attr_value has to be base-64 encoded.
  80. This is the case because of special chars or because attr_type is in
  81. self._base64_attrs
  82. """
  83. return attr_type.lower() in self._base64_attrs or \
  84. UNSAFE_STRING_RE.search(attr_value) is not None
  85. def _unparse_attr(self, attr_type, attr_value):
  86. """Write a single attribute type/value pair."""
  87. if self._needs_base64_encoding(attr_type, attr_value):
  88. encoded = base64.encodestring(attr_value.encode('utf8'))\
  89. .replace(b'\n', b'')\
  90. .decode('utf8')
  91. line = ':: '.join([attr_type, encoded])
  92. else:
  93. line = ': '.join([attr_type, attr_value])
  94. self._fold_line(line.encode('utf8'))
  95. def _unparse_entry_record(self, entry):
  96. """
  97. :type entry: Dict[string, List[string]]
  98. :param entry: Dictionary holding an entry
  99. """
  100. for attr_type in sorted(entry.keys()):
  101. for attr_value in entry[attr_type]:
  102. self._unparse_attr(attr_type, attr_value)
  103. def _unparse_changetype(self, mod_len):
  104. """Detect and write the changetype."""
  105. if mod_len == 2:
  106. changetype = 'add'
  107. elif mod_len == 3:
  108. changetype = 'modify'
  109. else:
  110. raise ValueError("modlist item of wrong length")
  111. self._unparse_attr('changetype', changetype)
  112. def _unparse_change_record(self, modlist):
  113. """
  114. :type modlist: List[Tuple]
  115. :param modlist: List of additions (2-tuple) or modifications (3-tuple)
  116. """
  117. mod_len = len(modlist[0])
  118. self._unparse_changetype(mod_len)
  119. for mod in modlist:
  120. if len(mod) != mod_len:
  121. raise ValueError("Subsequent modlist item of wrong length")
  122. if mod_len == 2:
  123. mod_type, mod_vals = mod
  124. elif mod_len == 3:
  125. mod_op, mod_type, mod_vals = mod
  126. self._unparse_attr(MOD_OPS[mod_op], mod_type)
  127. for mod_val in mod_vals:
  128. self._unparse_attr(mod_type, mod_val)
  129. if mod_len == 3:
  130. self._output_file.write(b'-' + self._line_sep)
  131. def unparse(self, dn, record):
  132. """Write an entry or change record to the output file.
  133. :type dn: string
  134. :param dn: distinguished name
  135. :type record: Union[Dict[string, List[string]], List[Tuple]]
  136. :param record: Either a dictionary holding an entry or a list of
  137. additions (2-tuple) or modifications (3-tuple).
  138. """
  139. self._unparse_attr('dn', dn)
  140. if isinstance(record, dict):
  141. self._unparse_entry_record(record)
  142. elif isinstance(record, list):
  143. self._unparse_change_record(record)
  144. else:
  145. raise ValueError("Argument record must be dictionary or list")
  146. self._output_file.write(self._line_sep)
  147. self.records_written += 1
  148. class LDIFParser(object):
  149. """Read LDIF entry or change records from file object.
  150. :type input_file: file-like object in binary mode
  151. :param input_file: file to read the LDIF input from
  152. :type ignored_attr_types: List[string]
  153. :param ignored_attr_types: List of attribute types that will be ignored
  154. :type process_url_schemes: List[bytearray]
  155. :param process_url_schemes: List of URL schemes to process with urllib.
  156. An empty list turns off all URL processing and the attribute is
  157. ignored completely.
  158. :type line_sep: bytearray
  159. :param line_sep: line separator
  160. :type strict: boolean
  161. :param strict: If set to ``False``, recoverable parse errors will produce
  162. log warnings rather than exceptions.
  163. """
  164. def _strip_line_sep(self, s):
  165. """Strip trailing line separators from s, but no other whitespaces."""
  166. if s[-2:] == b'\r\n':
  167. return s[:-2]
  168. elif s[-1:] == b'\n':
  169. return s[:-1]
  170. else:
  171. return s
  172. def __init__(
  173. self,
  174. input_file,
  175. ignored_attr_types=[],
  176. process_url_schemes=[],
  177. line_sep=b'\n',
  178. strict=True):
  179. self._input_file = input_file
  180. self._process_url_schemes = lower(process_url_schemes)
  181. self._ignored_attr_types = lower(ignored_attr_types)
  182. self._line_sep = line_sep
  183. self._strict = strict
  184. self.line_counter = 0 #: number of lines that have been read
  185. self.byte_counter = 0 #: number of bytes that have been read
  186. self.records_read = 0 #: number of records that have been read
  187. def _iter_unfolded_lines(self):
  188. """Iter input unfoled lines. Skip comments."""
  189. line = self._input_file.readline()
  190. while line:
  191. self.line_counter += 1
  192. self.byte_counter += len(line)
  193. line = self._strip_line_sep(line)
  194. nextline = self._input_file.readline()
  195. while nextline and nextline[:1] == b' ':
  196. line += self._strip_line_sep(nextline)[1:]
  197. nextline = self._input_file.readline()
  198. if not line.startswith(b'#'):
  199. yield line
  200. line = nextline
  201. def _iter_blocks(self):
  202. """Iter input lines in blocks separated by blank lines."""
  203. lines = []
  204. for line in self._iter_unfolded_lines():
  205. if line:
  206. lines.append(line)
  207. else:
  208. self.records_read += 1
  209. yield lines
  210. lines = []
  211. if lines:
  212. self.records_read += 1
  213. yield lines
  214. def _parse_attr(self, line):
  215. """Parse a single attribute type/value pair."""
  216. colon_pos = line.index(b':')
  217. attr_type = line[0:colon_pos]
  218. if line[colon_pos:].startswith(b'::'):
  219. attr_value = base64.decodestring(line[colon_pos + 2:])
  220. elif line[colon_pos:].startswith(b':<'):
  221. url = line[colon_pos + 2:].strip()
  222. attr_value = b''
  223. if self._process_url_schemes:
  224. u = urlparse(url)
  225. if u[0] in self._process_url_schemes:
  226. attr_value = urlopen(url.decode('ascii')).read()
  227. else:
  228. attr_value = line[colon_pos + 1:].strip()
  229. return attr_type.decode('utf8'), attr_value.decode('utf8')
  230. def _error(self, msg):
  231. if self._strict:
  232. raise ValueError(msg)
  233. else:
  234. log.warning(msg)
  235. def _check_dn(self, dn, attr_value):
  236. """Check dn attribute for issues."""
  237. if dn is not None:
  238. self._error('Two lines starting with dn: in one record.')
  239. if not is_dn(attr_value):
  240. self._error('No valid string-representation of '
  241. 'distinguished name %s.' % attr_value)
  242. def _check_changetype(self, dn, changetype, attr_value):
  243. """Check changetype attribute for issues."""
  244. if dn is None:
  245. self._error('Read changetype: before getting valid dn: line.')
  246. if changetype is not None:
  247. self._error('Two lines starting with changetype: in one record.')
  248. if attr_value not in CHANGE_TYPES:
  249. self._error('changetype value %s is invalid.' % attr_value)
  250. def _parse_entry_record(self, lines):
  251. """Parse a single entry record from a list of lines."""
  252. dn = None
  253. entry = OrderedDict()
  254. for line in lines:
  255. attr_type, attr_value = self._parse_attr(line)
  256. if attr_type == 'dn':
  257. self._check_dn(dn, attr_value)
  258. dn = attr_value
  259. elif attr_type == 'version' and dn is None:
  260. pass # version = 1
  261. else:
  262. if dn is None:
  263. self._error('First line of record does not start '
  264. 'with "dn:": %s' % attr_type)
  265. if attr_value is not None and \
  266. attr_type.lower() not in self._ignored_attr_types:
  267. if attr_type in entry:
  268. entry[attr_type].append(attr_value)
  269. else:
  270. entry[attr_type] = [attr_value]
  271. return dn, entry
  272. def parse(self):
  273. """Iterate LDIF entry records.
  274. :rtype: Iterator[Tuple[string, Dict]]
  275. :return: (dn, entry)
  276. """
  277. for block in self._iter_blocks():
  278. yield self._parse_entry_record(block)