Download reverse DNS map from GitHub

This commit is contained in:
Sean Whalen
2024-03-26 12:07:10 -04:00
parent ed593a0b49
commit 18f7508a1f
2 changed files with 48 additions and 13 deletions

View File

@@ -49,6 +49,7 @@ MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20"
MAGIC_JSON = b"\7b"
IP_ADDRESS_CACHE = ExpiringDict(max_len=10000, max_age_seconds=1800)
REVERSE_DNS_MAP = dict()
class ParserError(RuntimeError):
@@ -840,6 +841,7 @@ def parse_forensic_report(feedback_report, sample, msg_date,
ip_address = re.split(r'\s', parsed_report["source_ip"]).pop(0)
parsed_report_source = get_ip_address_info(ip_address,
cache=IP_ADDRESS_CACHE,
ip_db_path=ip_db_path,
offline=offline,
nameservers=nameservers,

View File

@@ -17,6 +17,8 @@ import atexit
import mailbox
import re
import csv
import io
try:
import importlib.resources as pkg_resources
except ImportError:
@@ -30,6 +32,7 @@ import dns.exception
import geoip2.database
import geoip2.errors
import publicsuffixlist
import requests
from parsedmarc.log import logger
import parsedmarc.resources.dbip
@@ -295,36 +298,63 @@ def get_ip_address_country(ip_address, db_path=None):
return country
def get_service_from_reverse_dns_base_domain(base_domain):
def get_service_from_reverse_dns_base_domain(base_domain,
offline=False,
reverse_dns_map=None):
"""
Returns the service name of a given base domain name from reverse DNS.
Args:
base_domain (str): The base domain of the reverse DNS lookup
offline (bool): Use the built-in copy of the reverse DNS map
reverse_dns_map (dict): A reverse DNS map
Returns:
dict: A dictionary containing name and type.
If the service is unknown, the name will be
the supplied reverse_dns_base_domain and the type will be None
"""
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(
name=row["name"],
type=row["type"])
base_domain = base_domain.lower().strip()
service_map = dict()
with pkg_resources.path(parsedmarc.resources.maps,
"base_reverse_dns_map.csv") as path:
with open(path) as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
service_map[row["base_reverse_dns"].lower().strip()] = dict(
name=row["name"],
type=row["type"])
url = ("https://raw.githubusercontent.com/domainaware/parsedmarc/master/"
"parsedmarc/resources/maps/base_reverse_dns_map.csv")
if reverse_dns_map is None:
reverse_dns_map = dict()
csv_file = io.StringIO()
if not offline and len(reverse_dns_map) == 0:
try:
logger.debug(f"Trying to fetch "
f"reverse DNS map from {url}...")
csv_file.write(requests.get(url).text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
with pkg_resources.path(parsedmarc.resources.maps,
"base_reverse_dns_map.csv") as path:
with open(path) as csv_file:
load_csv(csv_file)
try:
service = service_map[base_domain]
service = reverse_dns_map[base_domain]
except KeyError:
service = dict(name=base_domain, type=None)
return service
def get_ip_address_info(ip_address, ip_db_path=None, cache=None, offline=False,
def get_ip_address_info(ip_address, ip_db_path=None,
cache=None,
reverse_dns_map=None,
offline=False,
nameservers=None, timeout=2.0):
"""
Returns reverse DNS and country information for the given IP address
@@ -333,6 +363,7 @@ def get_ip_address_info(ip_address, ip_db_path=None, cache=None, offline=False,
ip_address (str): The IP address to check
ip_db_path (str): path to a MMDB file from MaxMind or DBIP
cache (ExpiringDict): Cache storage
reverse_dns_map (dict): A reverse DNS map
offline (bool): Do not make online queries for geolocation or DNS
nameservers (list): A list of one or more nameservers to use
(Cloudflare's public DNS resolvers by default)
@@ -368,7 +399,9 @@ def get_ip_address_info(ip_address, ip_db_path=None, cache=None, offline=False,
info["type"] = None
if reverse_dns is not None:
base_domain = get_base_domain(reverse_dns)
service = get_service_from_reverse_dns_base_domain(base_domain)
service = get_service_from_reverse_dns_base_domain(base_domain,
offline=offline,
reverse_dns_map=reverse_dns_map)
info["base_domain"] = base_domain
info["type"] = service["type"]
info["name"] = service["name"]