#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import os
import re
import json
from xml.etree import ElementTree
from xml.dom import minidom
from xml.etree.ElementTree import Element, SubElement, Comment
import time
class csv_format_convertor():
__source_file_address = None
__source_file_name = None
__source_file_headers = None
__source_file_content = None
__destination_file_format = None
__first_save_action = True
__validation_failed_rows = []
__validation_utf8_check_field_list = []
__validation_url_check_field_list = []
__validation_int_range_check_field_list = {}
__url_validator_regex = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain...
r'localhost|' #localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
def __str__(self):
""" Just a simple guide in case of using this class in interactive environment!
"""
if self.__source_file_address is None :
return "Please give a valid file in initialization!"
elif self.__destination_file_format is None :
return "Please choice export format with setExportFormat method!"
else:
return "Now you can save desire format or change your export format"
def __init__(self, source_file):
""" step 1 :
Initial our instance with CSV address and read that file
"""
if not os.path.isfile(source_file) :
raise ValueError('Given File Does Not Exists!')
self.__csv_extractor(source_file)
def __csv_extractor(self, source_file):
""" Extract rows and columns from csv file
"""
self.__source_file_address, self.__source_file_name = os.path.split(source_file)
first_row = True
content_lines = []
with open(source_file) as f:
for content in f:
content = content.strip()
matches = re.findall(r'\"(.+?)\"',content)
content = re.sub(r'\"(.+?)\"', '#', content)
content = content.split(",")
if first_row :
self.__source_file_headers = content
first_row = False
else :
if len(matches) > 0 :
content[1] = matches[0]
content_lines.append(content)
else :
self.__failed_rows.append(content)
self.__source_file_content = content_lines
def setExportFormat(self, export_format):
""" step 2 :
After initialization user should choose export format
"""
if not export_format in ["json","xml"]:
raise ValueError('Given Format Does Not Exists!')
else :
self.__destination_file_format = export_format
def saveExportFile(self):
""" step 3 :
Call right method for export in desire format
"""
if self.__first_save_action :
self.__validateCSV()
self.__first_save_action = False
result = None
if self.__destination_file_format == "json" :
result = self.__turnToJsonFormat()
elif self.__destination_file_format == "xml" :
result = self.__turnToXmlFormat()
else :
raise ValueError('Please give a correct export format!')
try:
destination_file = self.__source_file_address + os.path.sep + self.__source_file_name[:-4] + "." + self.__destination_file_format
with open(destination_file, "w") as text_file:
text_file.write(result)
message = "Result saved on disk in: " + destination_file
if len(self.__validation_failed_rows) > 0 :
error_log_destination_file = self.__source_file_address + os.path.sep + time.strftime("%Y-%m-%d-%H:%M") + "_failed_rows_log.csv"
with open(error_log_destination_file, "w") as text_file:
text_file.write(str("\n\t".join(self.__validation_failed_rows)))
message += "\nAlso, Some invalid field found and saved in: " + error_log_destination_file
self.__validation_failed_rows = []
return message
except Exception:
raise ValueError('Ooops, some error happened in writing file on disk! ')
def __validateCSV(self) :
"""Apply all validation rules on fields and make a clean rows list for export
"""
headers = self.__source_file_headers
headers_range = range(len(headers))
tmp_list = []
for item in self.__source_file_content :
row_validate = True
for i in headers_range:
if not self.__checkValidation(headers[i],item[i]) :
row_validate = False
if row_validate :
tmp_list.append(item)
else :
self.__validation_failed_rows.append(" ~ ".join(item))
self.__source_file_content = tmp_list
def __checkValidation(self, field_name, field_value) :
"""
Call right field content validator
"""
if field_name in self.__validation_utf8_check_field_list :
return self.__utf8Validator(field_value)
elif field_name in self.__validation_url_check_field_list :
return self.__urlValidator(field_value)
elif field_name in self.__validation_int_range_check_field_list.keys() :
return self.__intRangeValidator(field_name, field_value)
else :
return True
def __utf8Validator(self, field_value) :
try:
field_value.decode('utf-8')
return True
except UnicodeDecodeError:
return False
def __urlValidator(self, field_value) :
result = self.__url_validator_regex.match(field_value)
if result is None :
return False
else :
return True
def __intRangeValidator(self, field_name, field_value) :
try:
minimum = self.__validation_int_range_check_field_list[field_name][0]
maximum = self.__validation_int_range_check_field_list[field_name][1]
if minimum <= int(field_value) <= maximum :
return True
else :
return False
except:
return False
def setValidationUtf8CheckFieldList(self, field_name) :
if not field_name in self.__source_file_headers :
raise ValueError('This field does not exist in given csv file header!')
else :
self.__validation_utf8_check_field_list.append(field_name)
def setValidationUrlCheckFieldList(self, field_name) :
if not field_name in self.__source_file_headers :
raise ValueError('This field does not exist in given csv file header!')
else :
self.__validation_url_check_field_list.append(field_name)
def setValidationIntRangeCheckFieldList(self, field_name, minimum_number, maximum_number) :
if not field_name in self.__source_file_headers :
raise ValueError('This field does not exist in given csv file header!')
else :
self.__validation_int_range_check_field_list[field_name] = (minimum_number, maximum_number)
def __turnToJsonFormat(self) :
"""Turn csv to JSON and return as a string
"""
headers = self.__source_file_headers
headers_range = range(len(headers))
tmp_list = []
for item in self.__source_file_content :
tmp_dictionary = {}
for i in headers_range:
tmp_dictionary[headers[i]] = item[i]
tmp_list.append(tmp_dictionary)
return json.dumps(tmp_list)
def __turnToXmlFormat(self) :
"""Turn csv to XML and return as a string
"""
headers = self.__source_file_headers
headers_range = range(len(headers))
root = Element('root')
for item in self.__source_file_content :
child = Element('item')
for i in headers_range:
element = SubElement(child, headers[i])
element.text = str(item[i])
root.append(child)
return self.__prettify(root)
def __prettify(self, elem):
"""Return a pretty-printed XML string for the Element.
"""
rough_string = ElementTree.tostring(elem, 'utf-8')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent=" ")
if __name__ == "__main__":
if len(sys.argv) > 1 :
try:
selectedCSV = csv_format_convertor(sys.argv[1])
selectedCSV.setValidationUrlCheckFieldList("uri")
selectedCSV.setValidationUtf8CheckFieldList("name")
selectedCSV.setValidationIntRangeCheckFieldList("stars", 0, 5)
selectedCSV.setExportFormat("xml")
print selectedCSV.saveExportFile()
selectedCSV.setExportFormat("json")
print selectedCSV.saveExportFile()
except Exception as e:
print "Error: %s" % str(e)
else :
print "please give a csv file address"
BY: Farid Ahmadian
TAG: python, csv, script
DATE: 2017-04-23 10:36:53
With many thanks and best wishes for dear Pejman Moghadam, someone who taught me alot in linux and life :)