Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
from flask import Flask, request, render_template
import ipaddress
import re
import requests
import json
import os
# point to templates directory relative to home directory
template_dir = template_dir = os.path.join(os.path.dirname(__file__), 'templates')
app = Flask(__name__, template_folder=template_dir)
def get_asn_from_as(asn):
# Remove the "AS" from the beginning of the ASN
if asn.startswith("AS") or asn.startswith("as"):
asn = asn[2:]
asn = int(asn)
return asn
def get_more_specifics(data):
"""
:param data: json data from RIPEstat API
:return: list of more specific prefixes and their origins
"""
prefixes = []
origins = []
for prefix_origin in data["data"]["more_specifics"]:
prefixes.append(prefix_origin['prefix'])
origins.append(prefix_origin['origin'])
return [prefixes, origins]
def get_prefix_roa_status(prefix, origin):
"""
:param prefix: prefix to check
:param origin: origin ASN to check
:return: ROV status of the prefix based on RIPEstat ROA data
"""
url = f"https://stat.ripe.net/data/rpki-validation/data.json?resource={origin}&prefix={prefix}"
response = requests.get(url)
data = json.loads(response.text)
data_str = json.dumps(data, indent=4)
# print(data_str)
try:
validation_status = data["data"]["status"]
except KeyError:
return None
return(validation_status)
def get_prefix_info(prefix):
"""
:param prefix: covering prefix to check for more specific prefixes
:return: list of more specific prefixes and their origins
"""
url = "https://stat.ripe.net/data/routing-status/data.json?resource=" + prefix
try:
response = requests.get(url)
except requests.exceptions.RequestException:
return None
data = json.loads(response.text)
try:
if "prefix" in data["data"]["last_seen"]:
seen_origin = data["data"]["last_seen"]["origin"]
more_specifics, more_specific_origins = get_more_specifics(data)
return [prefix, seen_origin, more_specifics, more_specific_origins]
except KeyError:
return None
def return_rov_status(roa_prefix, roa_maxlen, roa_asn, prefix, origin_asn):
""" given a prefix and an origin ASN, as well as a proposed ROA (prefix, maxlen, asn),
determine if the ROA covers the prefix and origin ASN
:param roa_prefix: prefix from the ROA
:param roa_maxlen: maximum length of the prefix from the ROA
:param roa_asn: ASN from the ROA
:param prefix: prefix from the routing table
:param origin_asn: origin ASN from the routing table
:return: ROV status of the prefix"""
roa_ip_prefix = ipaddress.ip_network(roa_prefix)
ip_prefix = ipaddress.ip_network(prefix)
if not ip_prefix.subnet_of(roa_ip_prefix):
return "error: prefix not covered by ROA"
if ip_prefix.subnet_of(roa_ip_prefix) and int(roa_maxlen) >= int(ip_prefix.prefixlen) and int(roa_asn) == int(origin_asn):
return "valid"
else:
return "invalid"
def is_valid_prefix(prefix):
# Check if the prefix is a valid IPv4 or IPv6 prefix
try:
# Try to parse the prefix as an IPv4 or IPv6 prefix
ip_network = ipaddress.ip_network(prefix)
return True
except ValueError:
return False
def is_valid_asn(asn):
# Check if the ASN is a string starting with "AS", followed by numbers or just a number
if asn.isdigit():
return True
if re.match(r'^[Aa][Ss]\d+$', asn):
return True
else:
return False
def is_valid_prefix_maxlength(ip_prefix, prefix_maxlength):
""" check to see prefix_maxlength is a valid value for the prefix
:param ip_prefix: prefix to check
:param prefix_maxlength: maximum length of the prefix
:return: True if prefix_maxlength is valid, False otherwise
"""
try:
# Try to parse the prefix maxlength as an integer
prefix_maxlength = int(prefix_maxlength)
except ValueError:
return False
# Check if the prefix maxlength is in the valid range based on the type of ip_prefix
if ipaddress.ip_network(ip_prefix).version == 4:
return 1 <= prefix_maxlength <= 32 and prefix_maxlength >= ipaddress.ip_network(ip_prefix).prefixlen
elif ipaddress.ip_network(ip_prefix).version == 6:
return 1 <= prefix_maxlength <= 128 and prefix_maxlength >= ipaddress.ip_network(ip_prefix).prefixlen
def check_list_of_prefixes_against_ROA(origin, prefixes, origins, roa_prefix, roa_maxlen, roa_asn):
""" given a list of prefixes and their origins, check if the ROA covers them
:param origin: origin ASN
:param prefixes: list of prefixes
:param origins: list of origins
:param roa_prefix: prefix from the ROA
:param roa_maxlen: maximum length of the prefix from the ROA
:param roa_asn: ASN from the ROA
:return: list output lines to be displayed on the web page
"""
messages = []
existing_roa_status = get_prefix_roa_status(roa_prefix, origin)
messages.append([roa_prefix, return_rov_status(roa_prefix, roa_maxlen, roa_asn, roa_prefix, origin), origin,
existing_roa_status])
prefix_origin = zip(prefixes, origins)
for prefix, origin in prefix_origin:
existing_roa_status = get_prefix_roa_status(prefix, origin)
messages.append([prefix, return_rov_status(roa_prefix, roa_maxlen, roa_asn, prefix, origin), origin,
existing_roa_status])
return messages
@app.route('/results', methods=['GET'])
def results():
""" process the form data and return the results """
if request.method == 'GET':
roa_ip_prefix = request.args.get('ip_prefix')
roa_ip_prefix = roa_ip_prefix.strip()
origin_asn = request.args.get('origin_asn')
origin_asn = origin_asn.strip()
origin_asn = get_asn_from_as(origin_asn)
roa_origin_asn = origin_asn
roa_prefix_maxlength = request.args.get('prefix_maxlength')
roa_prefix_maxlength = roa_prefix_maxlength.strip()
# Validate the IP prefix
if not is_valid_prefix(roa_ip_prefix):
return f"\"{roa_ip_prefix}\" is an Invalid IP prefix"
# Validate the origin ASN
if not is_valid_asn(origin_asn):
return f"\"{origin_asn}\" is an Invalid origin ASN"
# Validate the prefix maxlength
if not is_valid_prefix_maxlength(roa_ip_prefix, roa_prefix_maxlength):
return f"\"{roa_prefix_maxlength}\" is an Invalid prefix maxlength"
roa_prefix_maxlength = int(roa_prefix_maxlength)
prefix_info = get_prefix_info(roa_ip_prefix)
if prefix_info is None:
return "Prefix not found or problems with RIPEstat API"
prefix, seen_origin, more_specifics, more_specific_origins = prefix_info
items = check_list_of_prefixes_against_ROA(seen_origin, more_specifics, more_specific_origins, roa_ip_prefix,
roa_prefix_maxlength, roa_origin_asn)
roa_info = [roa_ip_prefix, roa_prefix_maxlength, roa_origin_asn]
return render_template('render.html', items=items, roa_info=roa_info)
else:
return render_template('index.html')
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
if __name__ == '__main__':
print("new version - ssw")
app.run(port=8000, host='0.0.0.0')