Files
django-cresus/core/management/commands/import_csv.py
2017-09-24 11:42:05 +02:00

65 lines
2.9 KiB
Python

import datetime
import csv
import collections
from django.core.management.base import BaseCommand, CommandError
from core.models import Enregistrement, Etiquette
class Command(BaseCommand):
help = "Importe des enregistrement à partir d'un fichier .csv"
mapping = ("year", "month", "day", "etiquette", "description", "montant")
def add_arguments(self, parser):
parser.add_argument("files", nargs="+", type=str)
parser.add_argument("-t", "--test", action="store_true")
def handle(self, *args, **options):
header = "== Importation au format .csv"
if options['test']:
header += " (test)"
header += " ==\n%s" % (self.mapping,)
self.stdout.write(self.style.MIGRATE_HEADING(header))
# Find recors in each files
for filepath in options['files']:
with open(filepath, 'r') as f:
# Get all data from file
self.stdout.write(self.style.WARNING("[-] %s. " % filepath), ending='')
reader = csv.reader(f)
results = list()
for row in reader:
result = collections.OrderedDict()
for i, field in enumerate(self.mapping):
result[field] = row[i]
results.append(result)
self.stdout.write(self.style.SUCCESS("Ok (found %i results)." % len(results)))
# Then, create new objects if necessary
self.errors = []
for result in results:
date = datetime.date(int(result['year']), int(result['month']), int(result['day']))
etiquette, created = Etiquette.objects.get_or_create(nom=result['etiquette'])
data = {'date':date,
'description':str(result['description']),
'etiquette':etiquette,
'montant':float(result['montant'])}
output = "."
formatter = self.style.WARNING
if not options['test']:
try: # Check that object does not already exists
obj = Enregistrement.objects.get(**data)
except Enregistrement.DoesNotExist:
try:
obj = Enregistrement(**data)
obj.save()
output = "+"
formatter = self.style.SUCCESS
except Exception as e:
output = "E"
formatter = self.style.ERROR
if created:
output += "(added %s)" % etiquette
self.stdout.write(formatter(output), ending='')
self.stdout.write('\n')
for error in self.errors:
self.stdout.write(self.style.ERROR('%s' % error))