Skip to content
Permalink
Browse files
Initial commit
  • Loading branch information
ssw committed Jan 4, 2023
0 parents commit 8ba54c36f935b41308fe947ef8c89d60d13e0481
Showing 9 changed files with 273 additions and 0 deletions.
@@ -0,0 +1,2 @@
# Auto detect text files and perform LF normalization
* text=auto

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

174 main.py
@@ -0,0 +1,174 @@
from flask import Flask, request, render_template
import ipaddress
import re
import requests
import json
import datetime

# given a time string, return the hours between now and that time

def get_asn_from_as(asn):
# Remove the "AS" from the beginning of the ASN
if asn.startswith("AS") or asn.startswith("as"):
asn = asn[2:]
return asn

def get_time_delta(time_string):
# get the current time
current_time = datetime.datetime.now()
# get the time of the post
post_time = datetime.datetime.strptime(time_string, "%Y-%m-%dT%H:%M:%S")
# get the difference between the two times
delta = current_time - post_time
# return the number of hours between the two times
if delta.total_seconds() < 3600:
return "less than an hour ago"
return (f"{int(delta.total_seconds() / 3600)} hours ago")

# using RIPEstat API, given an AS number, return the set of prefixes announced by that AS

def get_less_specifics(data):
"""
:param data: json data from RIPEstat API
:return: list of less specific prefixes and their origins
"""
prefixes = []
origins = []
for prefix_origin in data["data"]["less_specifics"]:
prefixes.append(prefix_origin['prefix'])
origins.append(prefix_origin['origin'])
return [prefixes, origins]

def get_more_specifics(data):
"""
:param data: json data from RIPEstat API
:return: list of more specific prefixes and their origins
"""
prefixes = []
origins = []
for prefix_origin in data["data"]["more_specifics"]:
prefixes.append(prefix_origin['prefix'])
origins.append(prefix_origin['origin'])
return [prefixes, origins]

def get_prefix_info(prefix):
""" given a prefix, return the prefix information from RIPEstat API
:param prefix: prefix to query
:return: json data from RIPEstat API"""
url = "https://stat.ripe.net/data/routing-status/data.json?resource=" + prefix
response = requests.get(url)
print(response.text)
data = json.loads(response.text)
if data["messages"] != []:
if "error" in data["messages"][0]:
return_type = -1
error_message = data["messages"][0][1]
return [return_type, error_message]

if "prefix" in data["data"]["last_seen"]:
return_type = 1
prefix_last_seen = data["data"]["last_seen"]["prefix"]
last_seen_date = data["data"]["last_seen"]["time"]
last_seen_since_hours = get_time_delta(last_seen_date)
origin_asn = data["data"]["last_seen"]["origin"]
less_specifics, less_specific_origins = get_less_specifics(data)
more_specifics, more_specific_origins = get_more_specifics(data)
return [return_type, prefix_last_seen, last_seen_since_hours, origin_asn, less_specifics, less_specific_origins, more_specifics, more_specific_origins]

if "prefix" in data["data"]["less_specifics"][0]:
return_type = 2
less_specifics, less_specific_origins = get_less_specifics(data)
more_specifics, more_specific_origins = get_more_specifics(data)
return [return_type, less_specifics, less_specific_origins, more_specifics, more_specific_origins]


def return_rov_status(roa_prefix, roa_maxlen, roa_asn, prefix, origin_asn):
""" given a prefix and an origin ASN, determine if the prefix is ROV compliant
:param roa_prefix: prefix from the ROA
:param roa_maxlen: maximum length of the prefix from the ROA
:param roa_asn: ASN from the ROA
:param prefix: prefix from the routing table
:param origin_asn: origin ASN from the routing table
:return: ROV status of the prefix"""

roa_ip_prefix = ipaddress.ip_network(roa_prefix)
ip_prefix = ipaddress.ip_network(prefix)
if not ip_prefix.subnet_of(roa_ip_prefix):
return "Not covered by ROA"
if ip_prefix.subnet_of(roa_ip_prefix) and roa_maxlen >= ip_prefix.prefixlen and roa_asn == origin_asn:
return "Valid"
else:
return "Invalid"
app = Flask(__name__)

def is_valid_prefix(prefix):
try:
# Try to parse the prefix as an IPv4 or IPv6 prefix
ip_network = ipaddress.ip_network(prefix)
return True
except ValueError:
return False

def is_valid_asn(asn):
# Check if the ASN is a string starting with "AS", followed by numbers
if re.match(r'^[Aa][Ss]\d+$', asn):
return True
# Check if the ASN is just numbers
elif asn.isdigit():
return True
else:
return False

def is_valid_prefix_maxlength(ip_prefix, prefix_maxlength):
try:
# Try to parse the prefix maxlength as an integer
prefix_maxlength = int(prefix_maxlength)
except ValueError:
return False

# Check if the prefix maxlength is in the valid range based on the type of ip_prefix
if ipaddress.ip_network(ip_prefix).version == 4:
return 1 <= prefix_maxlength <= 32
elif ipaddress.ip_network(ip_prefix).version == 6:
return 1 <= prefix_maxlength <= 128

@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# Get the user input
ip_prefix = request.form['ip_prefix']

# Validate the IP prefix
if not is_valid_prefix(ip_prefix):
return 'Invalid IP prefix'

prefix_maxlength = request.form['prefix_maxlength']
origin_asn = request.form['origin_asn']

# Validate the origin ASN
if not is_valid_asn(origin_asn):
return 'Invalid origin ASN'

# Validate the prefix maxlength
if not is_valid_prefix_maxlength(ip_prefix, prefix_maxlength):
return 'Invalid prefix maxlength'

# Process the user input (e.g. validate the input, store it in a database, etc.)

ip_prefix = request.form['ip_prefix']
prefix_maxlength = request.form['prefix_maxlength']
origin_asn = request.form['origin_asn']

return get_prefix_info(ip_prefix)

else:
return render_template('index.html')



if __name__ == '__main__':
app.run()

#print(get_prefix_info("199.109.0.0/21"))
@@ -0,0 +1,56 @@
<!doctype html>
<html>
<head>
<title>IP Prefix Form</title>
<style>
body {
font-family: Arial, sans-serif;
}
form {
max-width: 400px;
margin: 0 auto;
padding: 20px;
border: 1px solid #ccc;
border-radius: 10px;
}
label {
display: block;
margin-bottom: 8px;
}
input[type="text"],
input[type="number"] {
width: 100%;
padding: 12px 20px;
margin-bottom: 20px;
box-sizing: border-box;
border: 1px solid #ccc;
border-radius: 4px;
}
input[type="submit"] {
width: 100%;
background-color: #4caf50;
color: white;
padding: 14px 20px;
margin: 8px 0;
border: none;
border-radius: 4px;
cursor: pointer;
}
input[type="submit"]:hover {
background-color: #45a049;
}
</style>
</head>
<body>
<p>This app queries the stat.ripe.net to determine if a RPKI-ROA created with the following information would likely agree (i.e., not evaluate as invalid) for routes currently seen in the Internet</p>
<form method="post">
<label for="ip_prefix">IP Prefix:</label>
<input type="text" name="ip_prefix" required>
<label for="prefix_maxlength">Prefix Maxlength:</label>
<input type="number" name="prefix_maxlength" required>
<label for="origin_asn">Origin ASN:</label>
<input type="text" name="origin_asn" required>
<input type="submit" value="Submit">
</form>
</body>
</html>

0 comments on commit 8ba54c3

Please sign in to comment.