123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- # -*- coding: utf-8 -*-
- import csv
- import numbers
-
- from itertools import izip
-
- pass_throughs = [
- 'register_dialect',
- 'unregister_dialect',
- 'get_dialect',
- 'list_dialects',
- 'field_size_limit',
- 'Dialect',
- 'excel',
- 'excel_tab',
- 'Sniffer',
- 'QUOTE_ALL',
- 'QUOTE_MINIMAL',
- 'QUOTE_NONNUMERIC',
- 'QUOTE_NONE',
- 'Error'
- ]
- __all__ = [
- 'reader',
- 'writer',
- 'DictReader',
- 'DictWriter',
- ] + pass_throughs
-
- for prop in pass_throughs:
- globals()[prop] = getattr(csv, prop)
-
-
- def _stringify(s, encoding, errors):
- if s is None:
- return ''
- if isinstance(s, unicode):
- return s.encode(encoding, errors)
- elif isinstance(s, numbers.Number):
- pass # let csv.QUOTE_NONNUMERIC do its thing.
- elif not isinstance(s, str):
- s = str(s)
- return s
-
-
- def _stringify_list(l, encoding, errors='strict'):
- try:
- return [_stringify(s, encoding, errors) for s in iter(l)]
- except TypeError as e:
- raise csv.Error(str(e))
-
-
- def _unicodify(s, encoding):
- if s is None:
- return None
- if isinstance(s, (unicode, int, float)):
- return s
- elif isinstance(s, str):
- return s.decode(encoding)
- return s
-
-
- class UnicodeWriter(object):
- """
- >>> import unicodecsv
- >>> from cStringIO import StringIO
- >>> f = StringIO()
- >>> w = unicodecsv.writer(f, encoding='utf-8')
- >>> w.writerow((u'é', u'ñ'))
- >>> f.seek(0)
- >>> r = unicodecsv.reader(f, encoding='utf-8')
- >>> row = r.next()
- >>> row[0] == u'é'
- True
- >>> row[1] == u'ñ'
- True
- """
- def __init__(self, f, dialect=csv.excel, encoding='utf-8', errors='strict',
- *args, **kwds):
- self.encoding = encoding
- self.writer = csv.writer(f, dialect, *args, **kwds)
- self.encoding_errors = errors
-
- def writerow(self, row):
- return self.writer.writerow(
- _stringify_list(row, self.encoding, self.encoding_errors))
-
- def writerows(self, rows):
- for row in rows:
- self.writerow(row)
-
- @property
- def dialect(self):
- return self.writer.dialect
- writer = UnicodeWriter
-
-
- class UnicodeReader(object):
- def __init__(self, f, dialect=None, encoding='utf-8', errors='strict',
- **kwds):
-
- format_params = ['delimiter', 'doublequote', 'escapechar',
- 'lineterminator', 'quotechar', 'quoting',
- 'skipinitialspace']
-
- if dialect is None:
- if not any([kwd_name in format_params
- for kwd_name in kwds.keys()]):
- dialect = csv.excel
- self.reader = csv.reader(f, dialect, **kwds)
- self.encoding = encoding
- self.encoding_errors = errors
- self._parse_numerics = bool(
- self.dialect.quoting & csv.QUOTE_NONNUMERIC)
-
- def next(self):
- row = self.reader.next()
- encoding = self.encoding
- encoding_errors = self.encoding_errors
- unicode_ = unicode
- if self._parse_numerics:
- float_ = float
- return [(value if isinstance(value, float_) else
- unicode_(value, encoding, encoding_errors))
- for value in row]
- else:
- return [unicode_(value, encoding, encoding_errors)
- for value in row]
-
- def __iter__(self):
- return self
-
- @property
- def dialect(self):
- return self.reader.dialect
-
- @property
- def line_num(self):
- return self.reader.line_num
- reader = UnicodeReader
-
-
- class DictWriter(csv.DictWriter):
- """
- >>> from cStringIO import StringIO
- >>> f = StringIO()
- >>> w = DictWriter(f, ['a', u'ñ', 'b'], restval=u'î')
- >>> w.writerow({'a':'1', u'ñ':'2'})
- >>> w.writerow({'a':'1', u'ñ':'2', 'b':u'ø'})
- >>> w.writerow({'a':u'é', u'ñ':'2'})
- >>> f.seek(0)
- >>> r = DictReader(f, fieldnames=['a', u'ñ'], restkey='r')
- >>> r.next() == {'a': u'1', u'ñ':'2', 'r': [u'î']}
- True
- >>> r.next() == {'a': u'1', u'ñ':'2', 'r': [u'\xc3\xb8']}
- True
- >>> r.next() == {'a': u'\xc3\xa9', u'ñ':'2', 'r': [u'\xc3\xae']}
- True
- """
- def __init__(self, csvfile, fieldnames, restval='',
- extrasaction='raise', dialect='excel', encoding='utf-8',
- errors='strict', *args, **kwds):
- self.encoding = encoding
- csv.DictWriter.__init__(self, csvfile, fieldnames, restval,
- extrasaction, dialect, *args, **kwds)
- self.writer = UnicodeWriter(csvfile, dialect, encoding=encoding,
- errors=errors, *args, **kwds)
- self.encoding_errors = errors
-
- def writeheader(self):
- header = dict(zip(self.fieldnames, self.fieldnames))
- self.writerow(header)
-
-
- class DictReader(csv.DictReader):
- """
- >>> from cStringIO import StringIO
- >>> f = StringIO()
- >>> w = DictWriter(f, fieldnames=['name', 'place'])
- >>> w.writerow({'name': 'Cary Grant', 'place': 'hollywood'})
- >>> w.writerow({'name': 'Nathan Brillstone', 'place': u'øLand'})
- >>> w.writerow({'name': u'Will ø. Unicoder', 'place': u'éSpandland'})
- >>> f.seek(0)
- >>> r = DictReader(f, fieldnames=['name', 'place'])
- >>> print r.next() == {'name': 'Cary Grant', 'place': 'hollywood'}
- True
- >>> print r.next() == {'name': 'Nathan Brillstone', 'place': u'øLand'}
- True
- >>> print r.next() == {'name': u'Will ø. Unicoder', 'place': u'éSpandland'}
- True
- """
- def __init__(self, csvfile, fieldnames=None, restkey=None, restval=None,
- dialect='excel', encoding='utf-8', errors='strict', *args,
- **kwds):
- if fieldnames is not None:
- fieldnames = _stringify_list(fieldnames, encoding)
- csv.DictReader.__init__(self, csvfile, fieldnames, restkey, restval,
- dialect, *args, **kwds)
- self.reader = UnicodeReader(csvfile, dialect, encoding=encoding,
- errors=errors, *args, **kwds)
- if fieldnames is None and not hasattr(csv.DictReader, 'fieldnames'):
- # Python 2.5 fieldnames workaround.
- # See http://bugs.python.org/issue3436
- reader = UnicodeReader(csvfile, dialect, encoding=encoding,
- *args, **kwds)
- self.fieldnames = _stringify_list(reader.next(), reader.encoding)
-
- if self.fieldnames is not None:
- self.unicode_fieldnames = [_unicodify(f, encoding) for f in
- self.fieldnames]
- else:
- self.unicode_fieldnames = []
-
- self.unicode_restkey = _unicodify(restkey, encoding)
-
- def next(self):
- row = csv.DictReader.next(self)
- result = dict((uni_key, row[str_key]) for (str_key, uni_key) in
- izip(self.fieldnames, self.unicode_fieldnames))
- rest = row.get(self.restkey)
- if rest:
- result[self.unicode_restkey] = rest
- return result
|