Skip to content
Permalink
00ad1f3971
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
312 lines (257 sloc) 11.9 KB
import os
import time
import pandas as pd
import radix
import ipaddress
import requests
import zipfile
import json
class ARINAgreementChecker:
"""
A class to check if an IP prefix is covered by an ARIN (American Registry for Internet Numbers) agreement.
This class handles downloading, extracting, and processing ARIN's network data to determine if a given IP prefix
falls under ARIN's agreement. It uses a radix tree structure to efficiently search for IP prefixes.
Attributes:
resources_under_agreement_report_url (str): URL to fetch the ARIN dataset.
arin_zip_file_name (str): File name for the downloaded zip file.
networks_filename (str): Name of the file containing network information inside the zip file.
rtree_arin_info (radix.Radix): Radix tree for storing ARIN info (not used in current implementation).
rtree_under_arin_agreement (radix.Radix): Radix tree for storing networks under ARIN agreement.
"""
def __init__(self, force_data_refresh=False):
"""Initializes ARINAgreementChecker, downloads ARIN data, and populates the radix tree."""
self.resources_under_agreement_report_url = (
"https://ftp.arin.net/pub/resource_registry_service/arin-networks-asns.zip"
)
self.arin_zip_file_name = "resources_under_agreement_report.zip"
self.networks_filename = "networks.csv"
self.rtree_arin_info = radix.Radix()
self.rtree_under_arin_agreement = radix.Radix()
self.arin_df = self._load_arin_data(force_data_refresh)
self._collect_arin_data(self.arin_df)
def _is_file_present(self, file_path):
"""Checks if a file exists at the given path.
Args:
file_path (str): Path to the file.
Returns:
bool: True if the file exists, False otherwise.
"""
return os.path.isfile(file_path)
def _remove_leading_zeros(self, ip_address):
"""Removes leading zeros from an IPv4 address for standardization.
Args:
ip_address (str): The IPv4 address.
Returns:
str: The standardized IPv4 address.
"""
return ".".join([str(int(i)) for i in ip_address.split(".")])
def _is_file_older_than_x_hours(self, file_path, hours=24):
"""Checks if a file is older than a specified number of hours.
Args:
file_path (str): Path to the file.
hours (int, optional): Number of hours to check against. Defaults to 24.
Returns:
bool: True if the file is older than the specified hours, False otherwise.
"""
file_age = time.time() - os.path.getmtime(file_path)
return file_age > hours * 3600
def _download_arin_data(self):
"""Downloads the ARIN dataset zip file from the specified URL."""
response = requests.get(self.resources_under_agreement_report_url)
with open(self.arin_zip_file_name, "wb") as file:
print(f"Downloading {self.arin_zip_file_name}")
file.write(response.content)
def _unzip_arin_data(self):
"""Extracts the ARIN dataset zip file."""
with zipfile.ZipFile(self.arin_zip_file_name, "r") as zip_ref:
zip_ref.extractall(".")
def _return_ARIN_data_as_df(self):
"""Reads the ARIN dataset into a pandas DataFrame.
Returns:
pandas.DataFrame: DataFrame containing the ARIN dataset.
"""
return pd.read_csv(self.networks_filename)
def covered_by_arin_agreement(self, ip_address):
"""Checks if an IP address is covered by the ARIN agreement.
Args:
ip_address (str): The IP address to check.
Returns:
bool: True if the IP address is covered by the ARIN agreement, False otherwise.
"""
rnode = self.rtree_under_arin_agreement.search_best(ip_address)
return rnode is not None
def _remove_ending_string(self, name, ending="-Z"):
"""Removes a specified ending from a string.
Args:
name (str): The string to modify.
ending (str): The ending to remove.
Returns:
str: The modified string.
"""
if name.endswith(ending):
return name[: -len(ending)]
return name
def _populate_arin_agreement_tree(self, prefixes):
for network_type in prefixes:
ip_networks = [
ipaddress.ip_network(network) for network in prefixes[network_type]
]
ip_network_summary = ipaddress.collapse_addresses(ip_networks)
for network in ip_network_summary:
rnode = self.rtree_under_arin_agreement.add(str(network))
rnode.data["status"] = "Under ARIN agreement"
def return_org_handle_and_org_name(self, prefix):
"""Returns the organization handle and name for a given IP prefix.
Args:
prefix (str): The IP prefix.
Returns:
tuple: A tuple containing the organization handle and name.
"""
rnode = self.rtree_arin_info.search_best(prefix)
if rnode is not None:
return rnode.data["org_handle"], rnode.data["org_name"]
return None, None
def is_from_ARIN(self, prefix):
"""Checks if an IP prefix is from ARIN.
Args:
prefix (str): The IP prefix.
Returns:
bool: True if the IP prefix is from ARIN, False otherwise.
"""
return self.rtree_arin_info.search_best(prefix) is not None
def _populate_arin_info_tree(self, by_org_prefixes):
network_types = ["IPv4", "IPv6"]
for org_handle in by_org_prefixes:
for network_type in network_types:
org_prefixes = []
for prefixes in by_org_prefixes[org_handle][network_type]:
org_prefixes.append(prefixes)
if org_prefixes:
ip_networks = [
ipaddress.ip_network(network) for network in org_prefixes
]
ip_network_summary = ipaddress.collapse_addresses(ip_networks)
for network in ip_network_summary:
rnode = self.rtree_arin_info.add(str(network))
rnode.data["org_handle"] = org_handle
rnode.data["org_name"] = by_org_prefixes[org_handle]["org_name"]
def _pretty_print_dict(self, dict):
formatted_dict = json.dumps(dict, indent=4)
print(formatted_dict)
def _list_of_IP_networks_from_row(self, row):
"""Returns a list of IP networks and the prefix type from a given DataFrame row.
Args:
row (pandas.Series): The DataFrame row.
Returns:
tuple: A tuple containing a list of IP networks and the prefix type.
"""
start_ip = row["Start IP"]
end_ip = row["End IP"]
if "." in start_ip:
prefix_type = "IPv4"
start_ip = self._remove_leading_zeros(start_ip)
end_ip = self._remove_leading_zeros(end_ip)
else:
prefix_type = "IPv6"
start_IP_address = ipaddress.ip_address(start_ip)
end_IP_address = ipaddress.ip_address(end_ip)
list_of_IP_networks = list(
ipaddress.summarize_address_range(start_IP_address, end_IP_address)
)
list_of_IP_networks = [str(network) for network in list_of_IP_networks]
return list_of_IP_networks, prefix_type
def _does_prefix_cover_any_prefix_in_list(self, prefix, prefix_list):
"""Checks if a given IP prefix covers any prefix in a list of prefixes, and the opposite.
Args:
prefix (str): The IP prefix.
prefix_list (list): A list of IP prefixes.
Returns:
bool: True if the given IP prefix covers any prefix in the list, False otherwise.
"""
prefix1 = ipaddress.ip_network(prefix)
for network in prefix_list:
prefix2 = ipaddress.ip_network(network)
if prefix1.version != prefix2.version:
return False
if prefix2.subnet_of(prefix1) or prefix2.supernet_of(prefix1):
return True
return False
def prefix_network_handles(self, prefix):
"""Returns the network handles for a given IP prefix.
Args:
prefix (str): The IP prefix.
Returns:
list: A list of network handles.
"""
if not self.is_from_ARIN(prefix):
return []
network_handles = []
org_handle = self.return_org_handle_and_org_name(prefix)[0]
df = self._table_for_org_handle(org_handle)
for index, row in df.iterrows():
list_of_IP_networks, prefix_type = self._list_of_IP_networks_from_row(row)
if self._does_prefix_cover_any_prefix_in_list(prefix, list_of_IP_networks):
network_handles.append(row["Network Handle"])
return network_handles
def _table_for_org_handle(self, org_handle):
"""Returns a table containing the networks for a given organization handle.
Includes both 'Org Handle' and 'Org Handle-Z' so we need to check for both
Args:
org_handle (str): The organization handle.
Returns:
pandas.DataFrame: DataFrame containing the networks for the given organization handle.
"""
org_handle = self._remove_ending_string(org_handle)
# filter df to include 'Org Handle' and 'Org Handle-Z' so we need to check for both
org_handles = [org_handle, org_handle + "-Z"]
df = self.arin_df[self.arin_df["Org Handle"].isin(org_handles)]
return df
def _collect_arin_data(self, table):
"""Populates the radix tree with networks under ARIN agreement from the given DataFrame.
Args:
table (pandas.DataFrame): DataFrame containing ARIN network data.
"""
prefixes = {"IPv4": [], "IPv6": []}
by_org_prefixes = {}
# table = table[table['Status'] == 'Full Registry Services']
for index, row in table.iterrows():
list_of_IP_networks, prefix_type = self._list_of_IP_networks_from_row(row)
org_handle = self._remove_ending_string(row["Org Handle"])
if org_handle in by_org_prefixes:
if prefix_type in by_org_prefixes[org_handle]:
by_org_prefixes[org_handle][prefix_type].extend(list_of_IP_networks)
else:
by_org_prefixes[org_handle][prefix_type] = list_of_IP_networks
else:
by_org_prefixes[org_handle] = {
"IPv4": [],
"IPv6": [],
"org_name": row["Org Name"],
}
by_org_prefixes[org_handle][prefix_type].extend(list_of_IP_networks)
if row["Status"] == "Full Registry Services":
for network in list_of_IP_networks:
prefixes[prefix_type].append(network)
# self.pretty_print_dict(by_org_prefixes)
self._populate_arin_agreement_tree(prefixes)
self._populate_arin_info_tree(by_org_prefixes)
def return_rtree_under_arin_agreement(self):
"""Returns the radix tree containing networks under ARIN agreement.
Returns:
radix.Radix: The radix tree containing networks under ARIN agreement.
"""
return self.rtree_under_arin_agreement
def _load_arin_data(self, force_data_refresh=False):
"""Loads the ARIN dataset, downloading and extracting it if necessary.
Returns:
pandas.DataFrame: DataFrame containing the ARIN dataset.
"""
if (
force_data_refresh
or not self._is_file_present(self.arin_zip_file_name)
or not self._is_file_present(self.networks_filename)
or self._is_file_older_than_x_hours(self.arin_zip_file_name, 8)
):
self._download_arin_data()
self._unzip_arin_data()
return self._return_ARIN_data_as_df()