#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# @file: railgun/common/csvdata.py
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# This file is released under BSD 2-clause license.
"""
Utilities to load objects from csv file.
Suppose you are given csv data like this:
.. code-block: csv
name,student-number,year,registered
Jenny,00001,21,True
Bob,000002,22,False
You may derive an object schema from :class:`CsvSchema`, giving the names and
types of the columns::
class MyObjectSchema(CsvSchema):
name = CsvString()
stdno = CsvString(name='student-number')
year = CsvInteger()
registered = CsvBoolean()
Then you may get the objects by::
with open('data.csv', 'rb') as f:
for obj in CsvSchema.LoadCSV(MyObjectSchema, f):
print obj
.. note::
The first row in csv file must be the column names! However, they may
not be at the same order as defined in schema. :class:`CsvSchema`
uses this row to detect the order.
"""
import csv
from railgun.common.lazy_i18n import lazy_gettext
[docs]class CsvField(object):
"""Define a column in csv file.
This is the base class for all types of columns. You may inherit this
class to provide your own field type, for example::
import json
class CsvJsonField(object):
def fromString(self, value):
return json.loads(value)
def toString(self, value):
return json.dumps(value)
:param name: Give the name of column. If not given, use the attribute
name in :class:`CsvSchema`.
:type name: :class:`str`
:param default: Give the default value of this column. If given, this
value will be used if such column does not exist. If not given,
:class:`KeyError` will be raised if not exist.
"""
def __init__(self, **kwargs):
# If name is given, this field will use a different name from its
# attribute name in Schema
self.name = kwargs.get('name', None)
# If default is given, this field will give the default value if
# field does not exist in CSV file. It not, raise KeyError.
self.has_default = 'default' in kwargs
self.default = kwargs.get('default', None)
[docs] def fromString(self, value):
"""Convert `value` from :class:`str` to field type.
Derived classes should overwrite this. You may raise any exceptions
as you need.
:return: Converted object.
"""
pass
def parseString(self, value):
try:
return self.fromString(value)
except Exception:
raise ValueError(lazy_gettext(
'Cannot convert "%(value)s" to %(type)s.',
value=value, type=self.__class__.__name__
))
[docs] def toString(self, value):
"""Convert `value` from field type to :class:`str`.
You must return such string representations that `fromString`
can convert it back.
:return: Converted str.
"""
return str(value)
def __repr__(self):
info = ['%s' % self.__class__.__name__]
if self.name:
info.append('name="%s"' % self.name)
if self.has_default:
info.append('default=%s' % repr(self.default))
return '<Field(%s)>' % ', '.join(info)
[docs]class CsvInteger(CsvField):
"""Define an integral field in csv file."""
def fromString(self, value):
return int(value)
[docs]class CsvString(CsvField):
"""Define a string field in csv file."""
def fromString(self, value):
return value
[docs]class CsvFloat(CsvField):
"""Define a float field in csv file."""
def fromString(self, value):
return float(value)
[docs]class CsvBoolean(CsvField):
"""Define a boolean field in csv file.
String literals will be converted according to the following table:
.. tabularcolumns:: |p{4cm}|p{11cm}|
======================= ================================================
Value Literals (Case Insensitive)
======================= ================================================
:data:`True` 'true', 'on', '1', 'yes'
:data:`False` 'false', 'off', '0', 'no'
:class:`ValueError` Any other literal
======================= ================================================
"""
def fromString(self, value):
val = value.lower()
if val in ('true', 'on', '1', 'yes'):
return True
if val in ('false', 'off', '0', 'no'):
return False
raise ValueError('%s is not a boolean value.' % value)
[docs]class CsvSchema(object):
"""Represent a data schema on CSV file."""
@staticmethod
[docs] def LoadCSV(cls, iterable):
"""Get iterable objects from given line `iterable` object."""
rdr = csv.reader(iterable)
# parse the header line
headers = {k: i for i, k in enumerate(next(rdr))}
field_getter = {}
for k, v in cls.__dict__.iteritems():
if isinstance(v, CsvField):
field_name = v.name if v.name else k
if field_name in headers:
# set the getter to fetch Nth column of a row
# where N = headers[k]
field_getter[k] = (
lambda row, key=field_name, col=v: (
col.parseString(row[headers[key]])
)
)
elif v.has_default:
# not exist in CSV, if has default, use default value
field_getter[k] = lambda row, val=v: val.default
else:
# not exist, no default, raise KeyError
raise KeyError(lazy_gettext(
'Field "%(field)s" not found in CSV data.',
field=field_name
))
# Yield object from CSV one by one
for row in rdr:
if not row:
continue
obj = cls()
for f, g in field_getter.iteritems():
setattr(obj, f, g(row))
yield obj
@staticmethod
def SaveCSV(cls, fileobj, items):
writer = csv.writer(fileobj)
# Given attrname, field, get the field name
def FieldName(attrname, field):
return field.name if field.name else attrname
# Collect meta data
attrs = [(k, v) for k, v in cls.__dict__.iteritems()
if isinstance(v, CsvField)]
# Write the header
writer.writerow([FieldName(k, v) for k, v in attrs])
# Write value rows
for itm in items:
writer.writerow([v.toString(getattr(itm, k))
for k, v in attrs])