Farid Ahmadian / Python

Python CSV Convertor

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import os
import re
import json
from xml.etree import ElementTree
from xml.dom import minidom
from xml.etree.ElementTree import Element, SubElement, Comment
import time

class csv_format_convertor():
    __source_file_address = None
    __source_file_name = None
    __source_file_headers = None
    __source_file_content = None
    __destination_file_format = None
    __first_save_action = True
    __validation_failed_rows = []
    __validation_utf8_check_field_list = []
    __validation_url_check_field_list = []
    __validation_int_range_check_field_list = {}
    __url_validator_regex = re.compile(
    r'^(?:http|ftp)s?://' # http:// or https://
    r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain...
    r'localhost|' #localhost...
    r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
    r'(?::\d+)?' # optional port
    r'(?:/?|[/?]\S+)$', re.IGNORECASE)

    def __str__(self):
        """ Just a simple guide in case of using this class in interactive environment!
        """
        if self.__source_file_address is None :
            return "Please give a valid file in initialization!"
        elif self.__destination_file_format is None :
            return "Please choice export format with setExportFormat method!"
        else:
            return "Now you can save desire format or change your export format"

    def __init__(self, source_file):
        """ step 1 :
        Initial our instance with CSV address and read that file
        """
        if not os.path.isfile(source_file) :
            raise ValueError('Given File Does Not Exists!')
        self.__csv_extractor(source_file)

    def __csv_extractor(self, source_file):
        """ Extract rows and columns from csv file 
        """
        self.__source_file_address, self.__source_file_name = os.path.split(source_file)
        first_row = True
        content_lines = []
        with open(source_file) as f:
            for content in f:
                content = content.strip()
                matches = re.findall(r'\"(.+?)\"',content)
                content = re.sub(r'\"(.+?)\"', '#', content)
                content = content.split(",")
                if first_row :
                    self.__source_file_headers = content
                    first_row = False
                else :
                    if len(matches) > 0 :
                        content[1] = matches[0]
                        content_lines.append(content)
                    else :
                        self.__failed_rows.append(content)
        self.__source_file_content = content_lines

    def setExportFormat(self, export_format):
        """ step 2 :
        After initialization user should choose export format
        """
        if not export_format in ["json","xml"]:
            raise ValueError('Given Format Does Not Exists!')
        else :
            self.__destination_file_format = export_format

    def saveExportFile(self):
        """ step 3 :
        Call right method for export in desire format
        """
        if self.__first_save_action :
            self.__validateCSV()
            self.__first_save_action = False

        result = None
        if self.__destination_file_format == "json" :
            result = self.__turnToJsonFormat()
        elif self.__destination_file_format == "xml" :
            result = self.__turnToXmlFormat()
        else :
            raise ValueError('Please give a correct export format!')
        try:
            destination_file = self.__source_file_address + os.path.sep + self.__source_file_name[:-4] + "." + self.__destination_file_format
            with open(destination_file, "w") as text_file:
                text_file.write(result)
            message = "Result saved on disk in: " + destination_file
            if len(self.__validation_failed_rows) > 0 :
                error_log_destination_file = self.__source_file_address + os.path.sep + time.strftime("%Y-%m-%d-%H:%M")  + "_failed_rows_log.csv"
                with open(error_log_destination_file, "w") as text_file:
                    text_file.write(str("\n\t".join(self.__validation_failed_rows)))
                message += "\nAlso, Some invalid field found and saved in: " + error_log_destination_file
                self.__validation_failed_rows = []
            return message
        except Exception: 
            raise ValueError('Ooops, some error happened in writing file on disk! ')

    def __validateCSV(self) :
        """Apply all validation rules on fields and make a clean rows list for export  
        """
        headers = self.__source_file_headers
        headers_range = range(len(headers))
        tmp_list = []
        for item in self.__source_file_content :
            row_validate = True
            for i in headers_range:
                if not self.__checkValidation(headers[i],item[i]) :
                    row_validate = False
            if row_validate :
                tmp_list.append(item)
            else :
                self.__validation_failed_rows.append(" ~ ".join(item))
        self.__source_file_content = tmp_list

    def __checkValidation(self, field_name, field_value) :
        """
        Call right field content validator
        """
        if field_name in self.__validation_utf8_check_field_list :
            return self.__utf8Validator(field_value)
        elif field_name in self.__validation_url_check_field_list :
            return self.__urlValidator(field_value)
        elif field_name in self.__validation_int_range_check_field_list.keys() :
            return self.__intRangeValidator(field_name, field_value)
        else :
            return True

    def __utf8Validator(self, field_value) :
        try:
            field_value.decode('utf-8')
            return True
        except UnicodeDecodeError:
            return False

    def __urlValidator(self, field_value) :
        result = self.__url_validator_regex.match(field_value)
        if result is None :
            return False
        else :
            return True

    def __intRangeValidator(self, field_name, field_value) :
            try:
                minimum = self.__validation_int_range_check_field_list[field_name][0]
                maximum = self.__validation_int_range_check_field_list[field_name][1]
                if minimum <= int(field_value) <= maximum :
                    return True
                else :
                    return False
            except:
              return False

    def setValidationUtf8CheckFieldList(self, field_name) :
        if not field_name in self.__source_file_headers :
            raise ValueError('This field does not exist in given csv file header!')
        else :
            self.__validation_utf8_check_field_list.append(field_name)

    def setValidationUrlCheckFieldList(self, field_name) :
        if not field_name in self.__source_file_headers :
            raise ValueError('This field does not exist in given csv file header!')
        else :
            self.__validation_url_check_field_list.append(field_name)

    def setValidationIntRangeCheckFieldList(self, field_name, minimum_number, maximum_number) :
        if not field_name in self.__source_file_headers :
            raise ValueError('This field does not exist in given csv file header!')
        else :
            self.__validation_int_range_check_field_list[field_name] = (minimum_number, maximum_number)

    def __turnToJsonFormat(self) :
        """Turn csv to JSON and return as a string
        """
        headers = self.__source_file_headers
        headers_range = range(len(headers))
        tmp_list = []
        for item in self.__source_file_content :
            tmp_dictionary = {}
            for i in headers_range:
                tmp_dictionary[headers[i]] = item[i]
            tmp_list.append(tmp_dictionary)
        return json.dumps(tmp_list)

    def __turnToXmlFormat(self) :
        """Turn csv to XML and return as a string
        """
        headers = self.__source_file_headers
        headers_range = range(len(headers))
        root = Element('root')
        for item in self.__source_file_content :
            child = Element('item')
            for i in headers_range:
                element = SubElement(child, headers[i])
                element.text = str(item[i])
            root.append(child)
        return self.__prettify(root)

    def __prettify(self, elem):
        """Return a pretty-printed XML string for the Element.
        """
        rough_string = ElementTree.tostring(elem, 'utf-8')
        reparsed = minidom.parseString(rough_string)
        return reparsed.toprettyxml(indent="  ")

if __name__ == "__main__":
    if len(sys.argv) > 1 :
        try:
            selectedCSV = csv_format_convertor(sys.argv[1])
            selectedCSV.setValidationUrlCheckFieldList("uri")
            selectedCSV.setValidationUtf8CheckFieldList("name")
            selectedCSV.setValidationIntRangeCheckFieldList("stars", 0, 5)
            selectedCSV.setExportFormat("xml")
            print selectedCSV.saveExportFile()
            selectedCSV.setExportFormat("json")
            print selectedCSV.saveExportFile()
        except Exception as e:
            print "Error: %s" % str(e)
    else :
        print "please give a csv file address"

BY: Farid Ahmadian
TAG: python, csv, script
DATE: 2017-04-23 10:36:53


Farid Ahmadian / Python [ TXT ]

With many thanks and best wishes for dear Pejman Moghadam, someone who taught me alot in linux and life :)