Python CSV Convertor =============================== #!/usr/bin/python2.7 # -*- coding: utf-8 -*- import sys reload(sys) sys.setdefaultencoding('utf8') import os import re import json from xml.etree import ElementTree from xml.dom import minidom from xml.etree.ElementTree import Element, SubElement, Comment import time class csv_format_convertor(): __source_file_address = None __source_file_name = None __source_file_headers = None __source_file_content = None __destination_file_format = None __first_save_action = True __validation_failed_rows = [] __validation_utf8_check_field_list = [] __validation_url_check_field_list = [] __validation_int_range_check_field_list = {} __url_validator_regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain... r'localhost|' #localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) def __str__(self): """ Just a simple guide in case of using this class in interactive environment! """ if self.__source_file_address is None : return "Please give a valid file in initialization!" elif self.__destination_file_format is None : return "Please choice export format with setExportFormat method!" else: return "Now you can save desire format or change your export format" def __init__(self, source_file): """ step 1 : Initial our instance with CSV address and read that file """ if not os.path.isfile(source_file) : raise ValueError('Given File Does Not Exists!') self.__csv_extractor(source_file) def __csv_extractor(self, source_file): """ Extract rows and columns from csv file """ self.__source_file_address, self.__source_file_name = os.path.split(source_file) first_row = True content_lines = [] with open(source_file) as f: for content in f: content = content.strip() matches = re.findall(r'\"(.+?)\"',content) content = re.sub(r'\"(.+?)\"', '#', content) content = content.split(",") if first_row : self.__source_file_headers = content first_row = False else : if len(matches) > 0 : content[1] = matches[0] content_lines.append(content) else : self.__failed_rows.append(content) self.__source_file_content = content_lines def setExportFormat(self, export_format): """ step 2 : After initialization user should choose export format """ if not export_format in ["json","xml"]: raise ValueError('Given Format Does Not Exists!') else : self.__destination_file_format = export_format def saveExportFile(self): """ step 3 : Call right method for export in desire format """ if self.__first_save_action : self.__validateCSV() self.__first_save_action = False result = None if self.__destination_file_format == "json" : result = self.__turnToJsonFormat() elif self.__destination_file_format == "xml" : result = self.__turnToXmlFormat() else : raise ValueError('Please give a correct export format!') try: destination_file = self.__source_file_address + os.path.sep + self.__source_file_name[:-4] + "." + self.__destination_file_format with open(destination_file, "w") as text_file: text_file.write(result) message = "Result saved on disk in: " + destination_file if len(self.__validation_failed_rows) > 0 : error_log_destination_file = self.__source_file_address + os.path.sep + time.strftime("%Y-%m-%d-%H:%M") + "_failed_rows_log.csv" with open(error_log_destination_file, "w") as text_file: text_file.write(str("\n\t".join(self.__validation_failed_rows))) message += "\nAlso, Some invalid field found and saved in: " + error_log_destination_file self.__validation_failed_rows = [] return message except Exception: raise ValueError('Ooops, some error happened in writing file on disk! ') def __validateCSV(self) : """Apply all validation rules on fields and make a clean rows list for export """ headers = self.__source_file_headers headers_range = range(len(headers)) tmp_list = [] for item in self.__source_file_content : row_validate = True for i in headers_range: if not self.__checkValidation(headers[i],item[i]) : row_validate = False if row_validate : tmp_list.append(item) else : self.__validation_failed_rows.append(" ~ ".join(item)) self.__source_file_content = tmp_list def __checkValidation(self, field_name, field_value) : """ Call right field content validator """ if field_name in self.__validation_utf8_check_field_list : return self.__utf8Validator(field_value) elif field_name in self.__validation_url_check_field_list : return self.__urlValidator(field_value) elif field_name in self.__validation_int_range_check_field_list.keys() : return self.__intRangeValidator(field_name, field_value) else : return True def __utf8Validator(self, field_value) : try: field_value.decode('utf-8') return True except UnicodeDecodeError: return False def __urlValidator(self, field_value) : result = self.__url_validator_regex.match(field_value) if result is None : return False else : return True def __intRangeValidator(self, field_name, field_value) : try: minimum = self.__validation_int_range_check_field_list[field_name][0] maximum = self.__validation_int_range_check_field_list[field_name][1] if minimum <= int(field_value) <= maximum : return True else : return False except: return False def setValidationUtf8CheckFieldList(self, field_name) : if not field_name in self.__source_file_headers : raise ValueError('This field does not exist in given csv file header!') else : self.__validation_utf8_check_field_list.append(field_name) def setValidationUrlCheckFieldList(self, field_name) : if not field_name in self.__source_file_headers : raise ValueError('This field does not exist in given csv file header!') else : self.__validation_url_check_field_list.append(field_name) def setValidationIntRangeCheckFieldList(self, field_name, minimum_number, maximum_number) : if not field_name in self.__source_file_headers : raise ValueError('This field does not exist in given csv file header!') else : self.__validation_int_range_check_field_list[field_name] = (minimum_number, maximum_number) def __turnToJsonFormat(self) : """Turn csv to JSON and return as a string """ headers = self.__source_file_headers headers_range = range(len(headers)) tmp_list = [] for item in self.__source_file_content : tmp_dictionary = {} for i in headers_range: tmp_dictionary[headers[i]] = item[i] tmp_list.append(tmp_dictionary) return json.dumps(tmp_list) def __turnToXmlFormat(self) : """Turn csv to XML and return as a string """ headers = self.__source_file_headers headers_range = range(len(headers)) root = Element('root') for item in self.__source_file_content : child = Element('item') for i in headers_range: element = SubElement(child, headers[i]) element.text = str(item[i]) root.append(child) return self.__prettify(root) def __prettify(self, elem): """Return a pretty-printed XML string for the Element. """ rough_string = ElementTree.tostring(elem, 'utf-8') reparsed = minidom.parseString(rough_string) return reparsed.toprettyxml(indent=" ") if __name__ == "__main__": if len(sys.argv) > 1 : try: selectedCSV = csv_format_convertor(sys.argv[1]) selectedCSV.setValidationUrlCheckFieldList("uri") selectedCSV.setValidationUtf8CheckFieldList("name") selectedCSV.setValidationIntRangeCheckFieldList("stars", 0, 5) selectedCSV.setExportFormat("xml") print selectedCSV.saveExportFile() selectedCSV.setExportFormat("json") print selectedCSV.saveExportFile() except Exception as e: print "Error: %s" % str(e) else : print "please give a csv file address" _BY: Farid Ahmadian_ _TAG: python, csv, script_ _DATE: 2017-04-23 10:36:53_