From 1e79e331261ab4382f32881a61fd9e85ef8e0058 Mon Sep 17 00:00:00 2001 From: Ziad Sawalha Date: Fri, 14 Mar 2014 16:26:50 -0500 Subject: [PATCH] Python 3 compatibility Including converting one of the test data files to UTF-8. It was Windows encoded which failed with python 3's file.read encoding. --- pythonwhois/parse.py | 133 ++++++++++++++++++++++--------------------- test.py | 59 ++++++++++++++----- 2 files changed, 112 insertions(+), 80 deletions(-) diff --git a/pythonwhois/parse.py b/pythonwhois/parse.py index dce46e4..b3d5168 100644 --- a/pythonwhois/parse.py +++ b/pythonwhois/parse.py @@ -1,14 +1,6 @@ from __future__ import print_function import re, sys, datetime -if sys.version_info[0] >= 3: - def iteritems(d): - return iter(d.items()) -else: - def iteritems(d): - return d.iteritems() - - grammar = { "_data": { 'id': ['Domain ID:[ ]*(?P.+)'], @@ -145,18 +137,29 @@ grammar = { } } + +if sys.version_info < (3, 0): + def is_string(data): + """Test for string with support for python 2.""" + return isinstance(data, basestring) +else: + def is_string(data): + """Test for string with support for python 3.""" + return isinstance(data, str) + + def parse_raw_whois(raw_data, normalized=[]): data = {} - + raw_data = [segment.replace("\r", "") for segment in raw_data] # Carriage returns are the devil - + for segment in raw_data: - for rule_key, rule_regexes in iteritems(grammar['_data']): - if rule_key not in data: + for rule_key, rule_regexes in grammar['_data'].items(): + if (rule_key in data) == False: for line in segment.splitlines(): for regex in rule_regexes: result = re.search(regex, line, re.IGNORECASE) - + if result is not None: val = result.group("val").strip() if val != "": @@ -164,7 +167,7 @@ def parse_raw_whois(raw_data, normalized=[]): data[rule_key].append(val) except KeyError as e: data[rule_key] = [val] - + # Whois.com is a bit special... Fabulous.com also seems to use this format. match = re.search("Name Servers:([/s/S]+)\n\n", segment) if match is not None: @@ -214,43 +217,43 @@ def parse_raw_whois(raw_data, normalized=[]): match = re.search('ren-status:\s*(.+)', segment) if match is not None: data["status"].insert(0, match.group(1).strip()) - + data["contacts"] = parse_registrants(raw_data) - + # Parse dates try: data['expiration_date'] = remove_duplicates(data['expiration_date']) data['expiration_date'] = parse_dates(data['expiration_date']) except KeyError as e: pass # Not present - + try: data['creation_date'] = remove_duplicates(data['creation_date']) data['creation_date'] = parse_dates(data['creation_date']) except KeyError as e: pass # Not present - + try: data['updated_date'] = remove_duplicates(data['updated_date']) data['updated_date'] = parse_dates(data['updated_date']) except KeyError as e: pass # Not present - + try: data['nameservers'] = remove_duplicates([ns.rstrip(".") for ns in data['nameservers']]) except KeyError as e: pass # Not present - + try: data['emails'] = remove_duplicates(data['emails']) except KeyError as e: pass # Not present - + try: data['registrar'] = remove_duplicates(data['registrar']) except KeyError as e: pass # Not present - + # Remove e-mail addresses if they are already listed for any of the contacts known_emails = [] for contact in ("registrant", "tech", "admin", "billing"): @@ -263,51 +266,51 @@ def parse_raw_whois(raw_data, normalized=[]): data['emails'] = [email for email in data["emails"] if email not in known_emails] except KeyError as e: pass # Not present - - for key in data.keys(): + + for key in list(data.keys()): if data[key] is None or len(data[key]) == 0: del data[key] - + data["raw"] = raw_data - + if normalized != []: data = normalize_data(data, normalized) - + return data def normalize_data(data, normalized): for key in ("nameservers", "emails", "whois_server"): if key in data and data[key] is not None and (normalized == True or key in normalized): - if isinstance(data[key], basestring): + if is_string(data[key]): data[key] = data[key].lower() else: data[key] = [item.lower() for item in data[key]] - + for key, threshold in (("registrar", 4), ("status", 3)): if key in data and data[key] is not None and (normalized == True or key in normalized): - if isinstance(data[key], basestring): + if is_string(data[key]): data[key] = normalize_name(data[key], abbreviation_threshold=threshold, length_threshold=1) else: data[key] = [normalize_name(item, abbreviation_threshold=threshold, length_threshold=1) for item in data[key]] - - for contact_type, contact in iteritems(data['contacts']): + + for contact_type, contact in data['contacts'].items(): if contact is not None: for key in ("email",): if key in contact and contact[key] is not None and (normalized == True or key in normalized): - if isinstance(contact[key], basestring): + if isinstance(contact[key], str): contact[key] = contact[key].lower() else: contact[key] = [item.lower() for item in contact[key]] - + for key in ("name", "street"): if key in contact and contact[key] is not None and (normalized == True or key in normalized): contact[key] = normalize_name(contact[key], abbreviation_threshold=3) - + for key in ("city", "organization", "state", "country"): if key in contact and contact[key] is not None and (normalized == True or key in normalized): contact[key] = normalize_name(contact[key], abbreviation_threshold=3, length_threshold=3) - - for key in contact.keys(): + + for key in list(contact.keys()): try: contact[key] = contact[key].strip(", ") except AttributeError as e: @@ -357,23 +360,23 @@ def normalize_name(value, abbreviation_threshold=4, length_threshold=8, lowercas def parse_dates(dates): global grammar parsed_dates = [] - + for date in dates: for rule in grammar['_dateformats']: result = re.match(rule, date, re.IGNORECASE) - + if result is not None: try: # These are always numeric. If they fail, there is no valid date present. year = int(result.group("year")) day = int(result.group("day")) - + # Detect and correct shorthand year notation if year < 60: year += 2000 elif year < 100: year += 1900 - + # This will require some more guesswork - some WHOIS servers present the name of the month try: month = int(result.group("month")) @@ -384,28 +387,28 @@ def parse_dates(dates): except KeyError as e: # Unknown month name, default to 0 month = 0 - + try: hour = int(result.group("hour")) except IndexError as e: hour = 0 except TypeError as e: hour = 0 - + try: minute = int(result.group("minute")) except IndexError as e: minute = 0 except TypeError as e: minute = 0 - + try: second = int(result.group("second")) except IndexError as e: second = 0 except TypeError as e: second = 0 - + break except ValueError as e: # Something went horribly wrong, maybe there is no valid date present? @@ -426,7 +429,7 @@ def parse_dates(dates): parsed_dates.append(datetime.datetime(year, day, month, hour, minute, second)) except UnboundLocalError as e: pass - + if len(parsed_dates) > 0: return parsed_dates else: @@ -434,11 +437,11 @@ def parse_dates(dates): def remove_duplicates(data): cleaned_list = [] - + for entry in data: if entry not in cleaned_list: cleaned_list.append(entry) - + return cleaned_list def preprocess_regex(regex): @@ -449,7 +452,7 @@ def parse_registrants(data): tech_contact = None billing_contact = None admin_contact = None - + registrant_regexes = [ " Registrant:[ ]*\n (?P.*)\n (?P.*)\n (?P.*)\n (?P.*), (?P.*) (?P.*)\n (?P.*)\n(?: Phone: (?P.*)\n)? Email: (?P.*)\n", # Corporate Domains, Inc. "Registrant:\n (?P.+)\n (?P.+)\n(?: (?P.*)\n)?(?: (?P.*)\n)? (?P.+), (?P.+)\n (?P.+)\n (?P.+)\n (?P.+)\n\n", # OVH @@ -501,7 +504,7 @@ def parse_registrants(data): "Admin Contact Information :[ ]*\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n\n", # GAL Communication " Technical contact:\n (?P.+)\n (?P.*)\n (?P.+)\n (?P.+) (?P\S+),[ ]+(?P.+)\n (?P.+)\n (?P.+)\n (?P.*)\n (?P.*)", # .am ] - + admin_contact_regexes = [ " Administrative Contact:[ ]*\n (?P.*)\n (?P.*)\n (?P.*)\n (?P.*), (?P.*) (?P.*)\n (?P.*)\n(?: Phone: (?P.*)\n)? Email: (?P.*)\n", # Corporate Domains, Inc. "Administrative Contact:\n (?P.+)\n (?P.+)\n(?: (?P.*)\n)?(?: (?P.*)\n)? (?P.+), (?P.+)\n (?P.+)\n (?P.+)\n (?P.+)\n\n", # OVH @@ -520,7 +523,7 @@ def parse_registrants(data): "Tech Contact Information :[ ]*\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n\n", # GAL Communication " Administrative contact:\n (?P.+)\n (?P.*)\n (?P.+)\n (?P.+) (?P\S+),[ ]+(?P.+)\n (?P.+)\n (?P.+)\n (?P.*)\n (?P.*)", # .am ] - + billing_contact_regexes = [ "Billing ID:(?P.+)\nBilling Name:(?P.*)\nBilling Organization:(?P.*)\nBilling Street1:(?P.*)\n(?:Billing Street2:(?P.*)\n)?(?:Billing Street3:(?P.*)\n)?Billing City:(?P.*)\nBilling State/Province:(?P.*)\nBilling Postal Code:(?P.*)\nBilling Country:(?P.*)\nBilling Phone:(?P.*)\n(?:Billing Phone Ext.:(?P.*)\n)?(?:Billing FAX:(?P.*)\n)?(?:Billing FAX Ext.:(?P.*)\n)?Billing Email:(?P.*)", # nic.pw "Billing Contact ID:\s*(?P.+)\nBilling Contact Name:\s*(?P.+)\nBilling Contact Organization:\s*(?P.*)\nBilling Contact Address1:\s*(?P.+)\nBilling Contact Address2:\s*(?P.*)\nBilling Contact City:\s*(?P.+)\nBilling Contact State/Province:\s*(?P.+)\nBilling Contact Postal Code:\s*(?P.+)\nBilling Contact Country:\s*(?P.+)\nBilling Contact Country Code:\s*(?P.+)\nBilling Contact Phone Number:\s*(?P.+)\nBilling Contact Email:\s*(?P.+)\n", # .CO Internet @@ -534,18 +537,18 @@ def parse_registrants(data): "Billing contact:\n(?: (?P.+)\n)? (?P.+)\n (?P.+)\n (?P.+)\n (?P.+), (?P.+) (?P.+) (?P.+)\n Phone: (?P.*)\n Fax: (?P.*)\n", # Fabulous.com "Billing Contact Information :[ ]*\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n[ ]+(?P.*)\n\n", # GAL Communication ] - + # Some registries use NIC handle references instead of directly listing contacts... - + nic_contact_regexes = [ "personname:\s*(?P.+)\norganization:\s*(?P.+)\nstreet address:\s*(?P.+)\npostal code:\s*(?P.+)\ncity:\s*(?P.+)\ncountry:\s*(?P.+)\n(?:phone:\s*(?P.+)\n)?(?:fax-no:\s*(?P.+)\n)?(?:e-mail:\s*(?P.+)\n)?nic-hdl:\s*(?P.+)\nchanged:\s*(?P.+)", # nic.at "person:\s*(?P.+)\nnic-hdl:\s*(?P.+)\n", # .ie "nic-hdl:\s*(?P.+)\ntype:\s*(?P.+)\ncontact:\s*(?P.+)\n(?:.+\n)*?(?:address:\s*(?P.+)\naddress:\s*(?P.+)\naddress:\s*(?P.+)\naddress:\s*(?P.+)\n)?(?:phone:\s*(?P.+)\n)?(?:fax-no:\s*(?P.+)\n)?(?:.+\n)*?(?:e-mail:\s*(?P.+)\n)?(?:.+\n)*?changed:\s*(?P[0-9]{2}\/[0-9]{2}\/[0-9]{4}).*\n", # AFNIC madness without country field "nic-hdl:\s*(?P.+)\ntype:\s*(?P.+)\ncontact:\s*(?P.+)\n(?:.+\n)*?(?:address:\s*(?P.+)\n)?(?:address:\s*(?P.+)\n)?(?:address:\s*(?P.+)\n)?(?:phone:\s*(?P.+)\n)?(?:fax-no:\s*(?P.+)\n)?(?:.+\n)*?(?:e-mail:\s*(?P.+)\n)?(?:.+\n)*?changed:\s*(?P[0-9]{2}\/[0-9]{2}\/[0-9]{4}).*\n", # AFNIC madness any country -at all- "nic-hdl:\s*(?P.+)\ntype:\s*(?P.+)\ncontact:\s*(?P.+)\n(?:.+\n)*?(?:address:\s*(?P.+)\n)?(?:address:\s*(?P.+)\n)?(?:address:\s*(?P.+)\n)?(?:address:\s*(?P.+)\n)?country:\s*(?P.+)\n(?:phone:\s*(?P.+)\n)?(?:fax-no:\s*(?P.+)\n)?(?:.+\n)*?(?:e-mail:\s*(?P.+)\n)?(?:.+\n)*?changed:\s*(?P[0-9]{2}\/[0-9]{2}\/[0-9]{4}).*\n", # AFNIC madness with country field - + ] - + nic_contact_references = { "registrant": [ "registrant:\s*(?P.+)", # nic.at @@ -562,7 +565,7 @@ def parse_registrants(data): "billing-c:\s*(?P.+)" # iis.se ] } - + # Why do the below? The below is meant to handle with an edge case (issue #2) where a partial match followed # by a failure, for a regex containing the \s*.+ pattern, would send the regex module on a wild goose hunt for # matching positions. The workaround is to use \S.* instead of .+, but in the interest of keeping the regexes @@ -572,35 +575,35 @@ def parse_registrants(data): tech_contact_regexes = [preprocess_regex(regex) for regex in tech_contact_regexes] admin_contact_regexes = [preprocess_regex(regex) for regex in admin_contact_regexes] billing_contact_regexes = [preprocess_regex(regex) for regex in billing_contact_regexes] - + for segment in data: for regex in registrant_regexes: match = re.search(regex, segment) if match is not None: registrant = match.groupdict() break - + for segment in data: for regex in tech_contact_regexes: match = re.search(regex, segment) if match is not None: tech_contact = match.groupdict() break - + for segment in data: for regex in admin_contact_regexes: match = re.search(regex, segment) if match is not None: admin_contact = match.groupdict() break - + for segment in data: for regex in billing_contact_regexes: match = re.search(regex, segment) if match is not None: billing_contact = match.groupdict() break - + # Find NIC handle contact definitions handle_contacts = [] for regex in nic_contact_regexes: @@ -608,7 +611,7 @@ def parse_registrants(data): matches = re.finditer(regex, segment) for match in matches: handle_contacts.append(match.groupdict()) - + # Find NIC handle references and process them for category in nic_contact_references: for regex in nic_contact_references[category]: @@ -631,11 +634,11 @@ def parse_registrants(data): elif category == "admin": admin_contact = data_reference break - + # Post-processing for obj in (registrant, tech_contact, billing_contact, admin_contact): if obj is not None: - for key in obj.keys(): + for key in list(obj.keys()): if obj[key] is None or obj[key].strip() == "": # Just chomp all surrounding whitespace del obj[key] else: @@ -672,7 +675,7 @@ def parse_registrants(data): if 'lastname' in obj: elements.append(obj["lastname"]) obj["name"] = " ".join(elements) - + return { "registrant": registrant, "tech": tech_contact, diff --git a/test.py b/test.py index 78dc30b..f478747 100755 --- a/test.py +++ b/test.py @@ -1,6 +1,27 @@ #!/usr/bin/env python2 -import sys, argparse, os, pythonwhois, json, datetime +import sys, argparse, os, pythonwhois, json, datetime, codecs +import pkgutil +import encodings + + +def get_codecs(): + """Dynamically get list of codecs in python.""" + false_positives = set(["aliases"]) + found = set(name for imp, name, ispkg in pkgutil.iter_modules(encodings.__path__) if not ispkg) + found.difference_update(false_positives) + return found + + +def read_encoded_file(file_path): + """Try reading file using all codecs. Return the first succesfull one.""" + for encoding in get_codecs(): + try: + with codecs.open(file_path, "r", encoding) as f: + return f.read() + except Exception: + pass + parser = argparse.ArgumentParser(description="Runs or modifies the test suite for python-whois.") parser.add_argument("mode", nargs=1, choices=["run", "update"], default="run", help="Whether to run or update the tests. Only update if you know what you're doing!") @@ -14,7 +35,7 @@ ENDC = '\033[0m' def encoded_json_dumps(obj): try: return json.dumps(obj, default=json_fallback) - except UnicodeDecodeError, e: + except UnicodeDecodeError as e: return json.dumps(recursive_encode(obj, "latin-1"), default=json_fallback) def json_fallback(obj): @@ -24,7 +45,7 @@ def json_fallback(obj): return obj def recursive_encode(obj, encoding): - for key in obj.keys(): + for key in list(obj.keys()): if isinstance(obj[key], dict): obj[key] = recursive_encode(obj[key], encoding) elif isinstance(obj[key], list): @@ -74,18 +95,26 @@ if args.mode[0] == "run": suites = [] for target in targets: try: - with open(os.path.join("test/data", target), "r") as f: + with codecs.open(os.path.join("test/data", target), "r") as f: data = f.read().split("\n--\n") - except IOError, e: + except IOError as e: sys.stderr.write("Invalid domain %(domain)s specified. No test case or base data exists.\n" % {"domain": target}) errors = True continue - try: - with open(os.path.join("test/target_default", target), "r") as f: + except UnicodeDecodeError: + try: + # Try cp1252 (ufpa.br uses that) + with codecs.open(os.path.join("test/data", target), "r", 'cp1252') as f: + data = f.read().split("\n--\n") + except UnicodeDecodeError as e: + # Fall back to trying all registered codecs + data = read_encoded_file(os.path.join("test/data", target)).split("\n--\n") + try: + with codecs.open(os.path.join("test/target_default", target), "r") as f: default = f.read() - with open(os.path.join("test/target_normalized", target), "r") as f: + with codecs.open(os.path.join("test/target_normalized", target), "r") as f: normalized = f.read() - except IOError, e: + except IOError as e: sys.stderr.write("Missing target data for domain %(domain)s. Run `./test.py update %(domain)s` to correct this, after verifying that pythonwhois can correctly parse this particular domain.\n" % {"domain": target}) errors = True continue @@ -152,10 +181,10 @@ elif args.mode[0] == "update": updates = [] for target in targets: try: - with open(os.path.join("test/data", target), "r") as f: + with codecs.open(os.path.join("test/data", target), "r") as f: data = f.read().split("\n--\n") updates.append((target, data)) - except IOError, e: + except IOError as e: sys.stderr.write("Invalid domain %(domain)s specified. No base data exists.\n" % {"domain": target}) errors = True continue @@ -166,8 +195,8 @@ elif args.mode[0] == "update": for target, data in updates: default = pythonwhois.parse.parse_raw_whois(data) normalized = pythonwhois.parse.parse_raw_whois(data, normalized=True) - with open(os.path.join("test/target_default", target), "w") as f: + with codecs.open(os.path.join("test/target_default", target), "w") as f: f.write(encoded_json_dumps(default)) - with open(os.path.join("test/target_normalized", target), "w") as f: - f.write(encoded_json_dumps(normalized)) - print "Generated target data for %s." % target + with codecs.open(os.path.join("test/target_normalized", target), "w") as f: + f.write(encoded_json_dumps(normalized)) + print("Generated target data for %s." % target)