Python 3 compatibility

Including converting one of the test data files to
UTF-8. It was Windows encoded which failed with
python 3's file.read encoding.
master
Ziad Sawalha 10 years ago
parent 1af983ad4f
commit 1e79e33126

@ -1,14 +1,6 @@
from __future__ import print_function
import re, sys, datetime
if sys.version_info[0] >= 3:
def iteritems(d):
return iter(d.items())
else:
def iteritems(d):
return d.iteritems()
grammar = {
"_data": {
'id': ['Domain ID:[ ]*(?P<val>.+)'],
@ -145,18 +137,29 @@ grammar = {
}
}
if sys.version_info < (3, 0):
def is_string(data):
"""Test for string with support for python 2."""
return isinstance(data, basestring)
else:
def is_string(data):
"""Test for string with support for python 3."""
return isinstance(data, str)
def parse_raw_whois(raw_data, normalized=[]):
data = {}
raw_data = [segment.replace("\r", "") for segment in raw_data] # Carriage returns are the devil
for segment in raw_data:
for rule_key, rule_regexes in iteritems(grammar['_data']):
if rule_key not in data:
for rule_key, rule_regexes in grammar['_data'].items():
if (rule_key in data) == False:
for line in segment.splitlines():
for regex in rule_regexes:
result = re.search(regex, line, re.IGNORECASE)
if result is not None:
val = result.group("val").strip()
if val != "":
@ -164,7 +167,7 @@ def parse_raw_whois(raw_data, normalized=[]):
data[rule_key].append(val)
except KeyError as e:
data[rule_key] = [val]
# Whois.com is a bit special... Fabulous.com also seems to use this format.
match = re.search("Name Servers:([/s/S]+)\n\n", segment)
if match is not None:
@ -214,43 +217,43 @@ def parse_raw_whois(raw_data, normalized=[]):
match = re.search('ren-status:\s*(.+)', segment)
if match is not None:
data["status"].insert(0, match.group(1).strip())
data["contacts"] = parse_registrants(raw_data)
# Parse dates
try:
data['expiration_date'] = remove_duplicates(data['expiration_date'])
data['expiration_date'] = parse_dates(data['expiration_date'])
except KeyError as e:
pass # Not present
try:
data['creation_date'] = remove_duplicates(data['creation_date'])
data['creation_date'] = parse_dates(data['creation_date'])
except KeyError as e:
pass # Not present
try:
data['updated_date'] = remove_duplicates(data['updated_date'])
data['updated_date'] = parse_dates(data['updated_date'])
except KeyError as e:
pass # Not present
try:
data['nameservers'] = remove_duplicates([ns.rstrip(".") for ns in data['nameservers']])
except KeyError as e:
pass # Not present
try:
data['emails'] = remove_duplicates(data['emails'])
except KeyError as e:
pass # Not present
try:
data['registrar'] = remove_duplicates(data['registrar'])
except KeyError as e:
pass # Not present
# Remove e-mail addresses if they are already listed for any of the contacts
known_emails = []
for contact in ("registrant", "tech", "admin", "billing"):
@ -263,51 +266,51 @@ def parse_raw_whois(raw_data, normalized=[]):
data['emails'] = [email for email in data["emails"] if email not in known_emails]
except KeyError as e:
pass # Not present
for key in data.keys():
for key in list(data.keys()):
if data[key] is None or len(data[key]) == 0:
del data[key]
data["raw"] = raw_data
if normalized != []:
data = normalize_data(data, normalized)
return data
def normalize_data(data, normalized):
for key in ("nameservers", "emails", "whois_server"):
if key in data and data[key] is not None and (normalized == True or key in normalized):
if isinstance(data[key], basestring):
if is_string(data[key]):
data[key] = data[key].lower()
else:
data[key] = [item.lower() for item in data[key]]
for key, threshold in (("registrar", 4), ("status", 3)):
if key in data and data[key] is not None and (normalized == True or key in normalized):
if isinstance(data[key], basestring):
if is_string(data[key]):
data[key] = normalize_name(data[key], abbreviation_threshold=threshold, length_threshold=1)
else:
data[key] = [normalize_name(item, abbreviation_threshold=threshold, length_threshold=1) for item in data[key]]
for contact_type, contact in iteritems(data['contacts']):
for contact_type, contact in data['contacts'].items():
if contact is not None:
for key in ("email",):
if key in contact and contact[key] is not None and (normalized == True or key in normalized):
if isinstance(contact[key], basestring):
if isinstance(contact[key], str):
contact[key] = contact[key].lower()
else:
contact[key] = [item.lower() for item in contact[key]]
for key in ("name", "street"):
if key in contact and contact[key] is not None and (normalized == True or key in normalized):
contact[key] = normalize_name(contact[key], abbreviation_threshold=3)
for key in ("city", "organization", "state", "country"):
if key in contact and contact[key] is not None and (normalized == True or key in normalized):
contact[key] = normalize_name(contact[key], abbreviation_threshold=3, length_threshold=3)
for key in contact.keys():
for key in list(contact.keys()):
try:
contact[key] = contact[key].strip(", ")
except AttributeError as e:
@ -357,23 +360,23 @@ def normalize_name(value, abbreviation_threshold=4, length_threshold=8, lowercas
def parse_dates(dates):
global grammar
parsed_dates = []
for date in dates:
for rule in grammar['_dateformats']:
result = re.match(rule, date, re.IGNORECASE)
if result is not None:
try:
# These are always numeric. If they fail, there is no valid date present.
year = int(result.group("year"))
day = int(result.group("day"))
# Detect and correct shorthand year notation
if year < 60:
year += 2000
elif year < 100:
year += 1900
# This will require some more guesswork - some WHOIS servers present the name of the month
try:
month = int(result.group("month"))
@ -384,28 +387,28 @@ def parse_dates(dates):
except KeyError as e:
# Unknown month name, default to 0
month = 0
try:
hour = int(result.group("hour"))
except IndexError as e:
hour = 0
except TypeError as e:
hour = 0
try:
minute = int(result.group("minute"))
except IndexError as e:
minute = 0
except TypeError as e:
minute = 0
try:
second = int(result.group("second"))
except IndexError as e:
second = 0
except TypeError as e:
second = 0
break
except ValueError as e:
# Something went horribly wrong, maybe there is no valid date present?
@ -426,7 +429,7 @@ def parse_dates(dates):
parsed_dates.append(datetime.datetime(year, day, month, hour, minute, second))
except UnboundLocalError as e:
pass
if len(parsed_dates) > 0:
return parsed_dates
else:
@ -434,11 +437,11 @@ def parse_dates(dates):
def remove_duplicates(data):
cleaned_list = []
for entry in data:
if entry not in cleaned_list:
cleaned_list.append(entry)
return cleaned_list
def preprocess_regex(regex):
@ -449,7 +452,7 @@ def parse_registrants(data):
tech_contact = None
billing_contact = None
admin_contact = None
registrant_regexes = [
" Registrant:[ ]*\n (?P<organization>.*)\n (?P<name>.*)\n (?P<street>.*)\n (?P<city>.*), (?P<state>.*) (?P<postalcode>.*)\n (?P<country>.*)\n(?: Phone: (?P<phone>.*)\n)? Email: (?P<email>.*)\n", # Corporate Domains, Inc.
"Registrant:\n (?P<name>.+)\n (?P<street1>.+)\n(?: (?P<street2>.*)\n)?(?: (?P<street3>.*)\n)? (?P<postalcode>.+), (?P<city>.+)\n (?P<country>.+)\n (?P<phone>.+)\n (?P<email>.+)\n\n", # OVH
@ -501,7 +504,7 @@ def parse_registrants(data):
"Admin Contact Information :[ ]*\n[ ]+(?P<firstname>.*)\n[ ]+(?P<lastname>.*)\n[ ]+(?P<organization>.*)\n[ ]+(?P<email>.*)\n[ ]+(?P<street>.*)\n[ ]+(?P<city>.*)\n[ ]+(?P<postalcode>.*)\n[ ]+(?P<phone>.*)\n[ ]+(?P<fax>.*)\n\n", # GAL Communication
" Technical contact:\n (?P<name>.+)\n (?P<organization>.*)\n (?P<street>.+)\n (?P<city>.+) (?P<state>\S+),[ ]+(?P<postalcode>.+)\n (?P<country>.+)\n (?P<email>.+)\n (?P<phone>.*)\n (?P<fax>.*)", # .am
]
admin_contact_regexes = [
" Administrative Contact:[ ]*\n (?P<organization>.*)\n (?P<name>.*)\n (?P<street>.*)\n (?P<city>.*), (?P<state>.*) (?P<postalcode>.*)\n (?P<country>.*)\n(?: Phone: (?P<phone>.*)\n)? Email: (?P<email>.*)\n", # Corporate Domains, Inc.
"Administrative Contact:\n (?P<name>.+)\n (?P<street1>.+)\n(?: (?P<street2>.*)\n)?(?: (?P<street3>.*)\n)? (?P<postalcode>.+), (?P<city>.+)\n (?P<country>.+)\n (?P<phone>.+)\n (?P<email>.+)\n\n", # OVH
@ -520,7 +523,7 @@ def parse_registrants(data):
"Tech Contact Information :[ ]*\n[ ]+(?P<firstname>.*)\n[ ]+(?P<lastname>.*)\n[ ]+(?P<organization>.*)\n[ ]+(?P<email>.*)\n[ ]+(?P<street>.*)\n[ ]+(?P<city>.*)\n[ ]+(?P<postalcode>.*)\n[ ]+(?P<phone>.*)\n[ ]+(?P<fax>.*)\n\n", # GAL Communication
" Administrative contact:\n (?P<name>.+)\n (?P<organization>.*)\n (?P<street>.+)\n (?P<city>.+) (?P<state>\S+),[ ]+(?P<postalcode>.+)\n (?P<country>.+)\n (?P<email>.+)\n (?P<phone>.*)\n (?P<fax>.*)", # .am
]
billing_contact_regexes = [
"Billing ID:(?P<handle>.+)\nBilling Name:(?P<name>.*)\nBilling Organization:(?P<organization>.*)\nBilling Street1:(?P<street1>.*)\n(?:Billing Street2:(?P<street2>.*)\n)?(?:Billing Street3:(?P<street3>.*)\n)?Billing City:(?P<city>.*)\nBilling State/Province:(?P<state>.*)\nBilling Postal Code:(?P<postalcode>.*)\nBilling Country:(?P<country>.*)\nBilling Phone:(?P<phone>.*)\n(?:Billing Phone Ext.:(?P<phone_ext>.*)\n)?(?:Billing FAX:(?P<fax>.*)\n)?(?:Billing FAX Ext.:(?P<fax_ext>.*)\n)?Billing Email:(?P<email>.*)", # nic.pw
"Billing Contact ID:\s*(?P<handle>.+)\nBilling Contact Name:\s*(?P<name>.+)\nBilling Contact Organization:\s*(?P<organization>.*)\nBilling Contact Address1:\s*(?P<street1>.+)\nBilling Contact Address2:\s*(?P<street2>.*)\nBilling Contact City:\s*(?P<city>.+)\nBilling Contact State/Province:\s*(?P<state>.+)\nBilling Contact Postal Code:\s*(?P<postalcode>.+)\nBilling Contact Country:\s*(?P<country>.+)\nBilling Contact Country Code:\s*(?P<country_code>.+)\nBilling Contact Phone Number:\s*(?P<phone>.+)\nBilling Contact Email:\s*(?P<email>.+)\n", # .CO Internet
@ -534,18 +537,18 @@ def parse_registrants(data):
"Billing contact:\n(?: (?P<organization>.+)\n)? (?P<name>.+)\n (?P<email>.+)\n (?P<street>.+)\n (?P<city>.+), (?P<state>.+) (?P<postalcode>.+) (?P<country>.+)\n Phone: (?P<phone>.*)\n Fax: (?P<fax>.*)\n", # Fabulous.com
"Billing Contact Information :[ ]*\n[ ]+(?P<firstname>.*)\n[ ]+(?P<lastname>.*)\n[ ]+(?P<organization>.*)\n[ ]+(?P<email>.*)\n[ ]+(?P<street>.*)\n[ ]+(?P<city>.*)\n[ ]+(?P<postalcode>.*)\n[ ]+(?P<phone>.*)\n[ ]+(?P<fax>.*)\n\n", # GAL Communication
]
# Some registries use NIC handle references instead of directly listing contacts...
nic_contact_regexes = [
"personname:\s*(?P<name>.+)\norganization:\s*(?P<organization>.+)\nstreet address:\s*(?P<street>.+)\npostal code:\s*(?P<postalcode>.+)\ncity:\s*(?P<city>.+)\ncountry:\s*(?P<country>.+)\n(?:phone:\s*(?P<phone>.+)\n)?(?:fax-no:\s*(?P<fax>.+)\n)?(?:e-mail:\s*(?P<email>.+)\n)?nic-hdl:\s*(?P<handle>.+)\nchanged:\s*(?P<changedate>.+)", # nic.at
"person:\s*(?P<name>.+)\nnic-hdl:\s*(?P<handle>.+)\n", # .ie
"nic-hdl:\s*(?P<handle>.+)\ntype:\s*(?P<type>.+)\ncontact:\s*(?P<name>.+)\n(?:.+\n)*?(?:address:\s*(?P<street1>.+)\naddress:\s*(?P<street2>.+)\naddress:\s*(?P<street3>.+)\naddress:\s*(?P<country>.+)\n)?(?:phone:\s*(?P<phone>.+)\n)?(?:fax-no:\s*(?P<fax>.+)\n)?(?:.+\n)*?(?:e-mail:\s*(?P<email>.+)\n)?(?:.+\n)*?changed:\s*(?P<changedate>[0-9]{2}\/[0-9]{2}\/[0-9]{4}).*\n", # AFNIC madness without country field
"nic-hdl:\s*(?P<handle>.+)\ntype:\s*(?P<type>.+)\ncontact:\s*(?P<name>.+)\n(?:.+\n)*?(?:address:\s*(?P<street1>.+)\n)?(?:address:\s*(?P<street2>.+)\n)?(?:address:\s*(?P<street3>.+)\n)?(?:phone:\s*(?P<phone>.+)\n)?(?:fax-no:\s*(?P<fax>.+)\n)?(?:.+\n)*?(?:e-mail:\s*(?P<email>.+)\n)?(?:.+\n)*?changed:\s*(?P<changedate>[0-9]{2}\/[0-9]{2}\/[0-9]{4}).*\n", # AFNIC madness any country -at all-
"nic-hdl:\s*(?P<handle>.+)\ntype:\s*(?P<type>.+)\ncontact:\s*(?P<name>.+)\n(?:.+\n)*?(?:address:\s*(?P<street1>.+)\n)?(?:address:\s*(?P<street2>.+)\n)?(?:address:\s*(?P<street3>.+)\n)?(?:address:\s*(?P<street4>.+)\n)?country:\s*(?P<country>.+)\n(?:phone:\s*(?P<phone>.+)\n)?(?:fax-no:\s*(?P<fax>.+)\n)?(?:.+\n)*?(?:e-mail:\s*(?P<email>.+)\n)?(?:.+\n)*?changed:\s*(?P<changedate>[0-9]{2}\/[0-9]{2}\/[0-9]{4}).*\n", # AFNIC madness with country field
]
nic_contact_references = {
"registrant": [
"registrant:\s*(?P<handle>.+)", # nic.at
@ -562,7 +565,7 @@ def parse_registrants(data):
"billing-c:\s*(?P<handle>.+)" # iis.se
]
}
# Why do the below? The below is meant to handle with an edge case (issue #2) where a partial match followed
# by a failure, for a regex containing the \s*.+ pattern, would send the regex module on a wild goose hunt for
# matching positions. The workaround is to use \S.* instead of .+, but in the interest of keeping the regexes
@ -572,35 +575,35 @@ def parse_registrants(data):
tech_contact_regexes = [preprocess_regex(regex) for regex in tech_contact_regexes]
admin_contact_regexes = [preprocess_regex(regex) for regex in admin_contact_regexes]
billing_contact_regexes = [preprocess_regex(regex) for regex in billing_contact_regexes]
for segment in data:
for regex in registrant_regexes:
match = re.search(regex, segment)
if match is not None:
registrant = match.groupdict()
break
for segment in data:
for regex in tech_contact_regexes:
match = re.search(regex, segment)
if match is not None:
tech_contact = match.groupdict()
break
for segment in data:
for regex in admin_contact_regexes:
match = re.search(regex, segment)
if match is not None:
admin_contact = match.groupdict()
break
for segment in data:
for regex in billing_contact_regexes:
match = re.search(regex, segment)
if match is not None:
billing_contact = match.groupdict()
break
# Find NIC handle contact definitions
handle_contacts = []
for regex in nic_contact_regexes:
@ -608,7 +611,7 @@ def parse_registrants(data):
matches = re.finditer(regex, segment)
for match in matches:
handle_contacts.append(match.groupdict())
# Find NIC handle references and process them
for category in nic_contact_references:
for regex in nic_contact_references[category]:
@ -631,11 +634,11 @@ def parse_registrants(data):
elif category == "admin":
admin_contact = data_reference
break
# Post-processing
for obj in (registrant, tech_contact, billing_contact, admin_contact):
if obj is not None:
for key in obj.keys():
for key in list(obj.keys()):
if obj[key] is None or obj[key].strip() == "": # Just chomp all surrounding whitespace
del obj[key]
else:
@ -672,7 +675,7 @@ def parse_registrants(data):
if 'lastname' in obj:
elements.append(obj["lastname"])
obj["name"] = " ".join(elements)
return {
"registrant": registrant,
"tech": tech_contact,

@ -1,6 +1,27 @@
#!/usr/bin/env python2
import sys, argparse, os, pythonwhois, json, datetime
import sys, argparse, os, pythonwhois, json, datetime, codecs
import pkgutil
import encodings
def get_codecs():
"""Dynamically get list of codecs in python."""
false_positives = set(["aliases"])
found = set(name for imp, name, ispkg in pkgutil.iter_modules(encodings.__path__) if not ispkg)
found.difference_update(false_positives)
return found
def read_encoded_file(file_path):
"""Try reading file using all codecs. Return the first succesfull one."""
for encoding in get_codecs():
try:
with codecs.open(file_path, "r", encoding) as f:
return f.read()
except Exception:
pass
parser = argparse.ArgumentParser(description="Runs or modifies the test suite for python-whois.")
parser.add_argument("mode", nargs=1, choices=["run", "update"], default="run", help="Whether to run or update the tests. Only update if you know what you're doing!")
@ -14,7 +35,7 @@ ENDC = '\033[0m'
def encoded_json_dumps(obj):
try:
return json.dumps(obj, default=json_fallback)
except UnicodeDecodeError, e:
except UnicodeDecodeError as e:
return json.dumps(recursive_encode(obj, "latin-1"), default=json_fallback)
def json_fallback(obj):
@ -24,7 +45,7 @@ def json_fallback(obj):
return obj
def recursive_encode(obj, encoding):
for key in obj.keys():
for key in list(obj.keys()):
if isinstance(obj[key], dict):
obj[key] = recursive_encode(obj[key], encoding)
elif isinstance(obj[key], list):
@ -74,18 +95,26 @@ if args.mode[0] == "run":
suites = []
for target in targets:
try:
with open(os.path.join("test/data", target), "r") as f:
with codecs.open(os.path.join("test/data", target), "r") as f:
data = f.read().split("\n--\n")
except IOError, e:
except IOError as e:
sys.stderr.write("Invalid domain %(domain)s specified. No test case or base data exists.\n" % {"domain": target})
errors = True
continue
try:
with open(os.path.join("test/target_default", target), "r") as f:
except UnicodeDecodeError:
try:
# Try cp1252 (ufpa.br uses that)
with codecs.open(os.path.join("test/data", target), "r", 'cp1252') as f:
data = f.read().split("\n--\n")
except UnicodeDecodeError as e:
# Fall back to trying all registered codecs
data = read_encoded_file(os.path.join("test/data", target)).split("\n--\n")
try:
with codecs.open(os.path.join("test/target_default", target), "r") as f:
default = f.read()
with open(os.path.join("test/target_normalized", target), "r") as f:
with codecs.open(os.path.join("test/target_normalized", target), "r") as f:
normalized = f.read()
except IOError, e:
except IOError as e:
sys.stderr.write("Missing target data for domain %(domain)s. Run `./test.py update %(domain)s` to correct this, after verifying that pythonwhois can correctly parse this particular domain.\n" % {"domain": target})
errors = True
continue
@ -152,10 +181,10 @@ elif args.mode[0] == "update":
updates = []
for target in targets:
try:
with open(os.path.join("test/data", target), "r") as f:
with codecs.open(os.path.join("test/data", target), "r") as f:
data = f.read().split("\n--\n")
updates.append((target, data))
except IOError, e:
except IOError as e:
sys.stderr.write("Invalid domain %(domain)s specified. No base data exists.\n" % {"domain": target})
errors = True
continue
@ -166,8 +195,8 @@ elif args.mode[0] == "update":
for target, data in updates:
default = pythonwhois.parse.parse_raw_whois(data)
normalized = pythonwhois.parse.parse_raw_whois(data, normalized=True)
with open(os.path.join("test/target_default", target), "w") as f:
with codecs.open(os.path.join("test/target_default", target), "w") as f:
f.write(encoded_json_dumps(default))
with open(os.path.join("test/target_normalized", target), "w") as f:
f.write(encoded_json_dumps(normalized))
print "Generated target data for %s." % target
with codecs.open(os.path.join("test/target_normalized", target), "w") as f:
f.write(encoded_json_dumps(normalized))
print("Generated target data for %s." % target)

Loading…
Cancel
Save