diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index 694ced5..ceb98f2 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -13,7 +13,6 @@ import mailparser import json import hashlib import base64 -import platform import atexit import mailbox import re @@ -29,16 +28,11 @@ import dns.resolver import dns.exception import geoip2.database import geoip2.errors -import requests -import publicsuffix2 +import publicsuffixlist from parsedmarc.log import logger import parsedmarc.resources -USER_AGENT = "Mozilla/5.0 (({0} {1})) parsedmarc".format( - platform.system(), - platform.release(), - ) parenthesis_regex = re.compile(r'\s*\(.*\)\s*') @@ -83,7 +77,7 @@ def decode_base64(data): return base64.b64decode(data) -def get_base_domain(domain, use_fresh_psl=False): +def get_base_domain(domain): """ Gets the base domain name for the given domain @@ -93,41 +87,13 @@ def get_base_domain(domain, use_fresh_psl=False): Args: domain (str): A domain or subdomain - use_fresh_psl (bool): Download a fresh Public Suffix List Returns: str: The base domain of the given domain """ - psl_path = os.path.join(tempdir, "public_suffix_list.dat") - - def download_psl(): - url = "https://publicsuffix.org/list/public_suffix_list.dat" - # Use a browser-like user agent string to bypass some proxy blocks - headers = {"User-Agent": USER_AGENT} - try: - fresh_psl = requests.get(url, headers=headers).text - with open(psl_path, "w", encoding="utf-8") as fresh_psl_file: - fresh_psl_file.write(fresh_psl) - except Exception as error: - raise DownloadError( - "Failed to download an updated PSL {0}".format(error)) - - if use_fresh_psl: - if not os.path.exists(psl_path): - download_psl() - else: - psl_age = datetime.now() - datetime.fromtimestamp( - os.stat(psl_path).st_mtime) - if psl_age > timedelta(hours=24): - download_psl() - - with open(psl_path, encoding="utf-8") as psl_file: - psl = publicsuffix2.PublicSuffixList(psl_file) - - return psl.get_public_suffix(domain) - else: - return publicsuffix2.get_sld(domain) + psl = publicsuffixlist.PublicSuffixList() + return psl.privatesuffix(domain) def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0): diff --git a/pyproject.toml b/pyproject.toml index 6b5fb7e..d6a098f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "lxml>=4.4.0", "mailsuite>=1.6.1", "msgraph-core>=0.2.2", - "publicsuffix2>=2.20190812", + "publicsuffixlist>=0.10.0", "requests>=2.22.0", "tqdm>=4.31.1", "urllib3>=1.25.7", diff --git a/requirements.txt b/requirements.txt index 57f8d07..ac19106 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ dnspython>=2.0.0 expiringdict>=1.1.4 urllib3>=1.25.7 requests>=2.22.0 -publicsuffix2>=2.20190812 +publicsuffixlist>=0.10.0 xmltodict>=0.12.0 geoip2>=3.0.0 imapclient>=2.1.0 diff --git a/tests.py b/tests.py index 86ac83a..cab5219 100644 --- a/tests.py +++ b/tests.py @@ -18,14 +18,13 @@ class Test(unittest.TestCase): def testPSLDownload(self): subdomain = "foo.example.com" - result = parsedmarc.utils.get_base_domain(subdomain, - use_fresh_psl=True) + result = parsedmarc.utils.get_base_domain(subdomain) assert result == "example.com" - # Test PSL caching - result = parsedmarc.utils.get_base_domain(subdomain, - use_fresh_psl=True) - assert result == "example.com" + # Test newer PSL entries + subdomain = "e3191.c.akamaiedge.net" + result = parsedmarc.utils.get_base_domain(subdomain) + assert result == "c.akamaiedge.net" def testAggregateSamples(self): """Test sample aggregate/rua DMARC reports"""