Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
james-scripts/prefix-lists.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
executable file
271 lines (210 sloc)
7.98 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import glob | |
import re | |
import xml.etree.ElementTree as et | |
import json | |
from datetime import datetime | |
from collections import namedtuple | |
from collections import defaultdict | |
BGPNeighbor = namedtuple("BGPNeighbor", [ | |
"rtr_name", | |
"vrf", | |
"peer_as", | |
"address", | |
"description", | |
"prefix_lists" # Actually a "PrefixListRef", but whatever | |
]) | |
PrefixListRef = namedtuple("PrefixListRef", [ | |
"name", | |
"orlonger" | |
]) | |
PrefixList = namedtuple("PrefixList", [ | |
"rtr_name", | |
"name", | |
"prefixes" | |
]) | |
def main(): | |
prefix_lists = [] | |
neighbors = [] | |
# Load up config from each router | |
for config_file in glob.glob("../rtsw/xml/*.xml"): | |
rtr_prefix_lists, rtr_neighbors = parse_router_config(config_file) | |
prefix_lists.extend(rtr_prefix_lists) | |
neighbors.extend(rtr_neighbors) | |
# Output | |
print("Prefix List References") | |
for ne in neighbors: | |
if len(ne.prefix_lists) == 0: | |
continue | |
print(" {}".format(ne)) | |
print("Prefix Lists") | |
for pl in prefix_lists: | |
print(" {}".format(pl)) | |
print("Inconsistent Prefix Lists") | |
not_consistent = find_inconsistent_prefixlists(prefix_lists) | |
for pl_name in not_consistent: | |
print(" {}".format(pl_name)) | |
dump_prefix("PERFIX-LIST-DUMP.json", neighbors, prefix_lists, not_consistent) | |
# Done | |
return | |
def dump_prefix(filename, neighbors, prefix_lists, inconsistent_prefixlist_names): | |
obj = {} | |
obj['timestamp'] = datetime.utcnow().replace(microsecond=0).isoformat() | |
inconsistent_set = set(inconsistent_prefixlist_names) | |
# Create a peer-list | |
obj['peers'] = [ | |
{ | |
'router': ne.rtr_name, | |
'peeras': ne.peer_as, | |
'vrf': ne.vrf, | |
'ip': ne.address, | |
'description': ne.description, | |
'prefix_lists': [ | |
pl.name for pl in ne.prefix_lists | |
# if ... # TODO: Filter out ones we don't like | |
] | |
} | |
for ne in sorted(neighbors, key=lambda ne: int(ne.peer_as or "0")) | |
if ne.peer_as not in (None, "11537", "11164") # iBGP, RR, internal junk | |
# if len(ne.prefix_lists) > 0 ?? | |
] | |
obj['prefixlists'] = [ | |
{ | |
'name': pl.name, | |
'router': pl.rtr_name, | |
'prefixes': pl.prefixes, | |
'consistent': (pl.name not in inconsistent_set) | |
} for pl in sorted(prefix_lists, key=lambda pl: pl.name) | |
# if len(pl.prefixes) == 0? | |
] | |
obj['inconsistent'] = inconsistent_prefixlist_names | |
with open(filename, "w") as out: | |
s = json.dumps(obj, indent=4) | |
out.write(s) | |
return | |
def find_inconsistent_prefixlists(prefix_lists): | |
"""Find prefix-lists that are not consistent across routers | |
Returns a list of prefix-list names. | |
""" | |
# Generate a dict of prefix_list_name => set([cidrlist1, cidrlist2]). | |
# | |
# Since adding the same thing to a set twice doesn't duplicate anything | |
# if we have the same prefix-list contents 3 times, it'll wind up with only | |
# one entry. | |
# | |
# Note: we sort the cidrlist so minor ordering differences won't | |
# make two lists seem different. | |
# | |
name2cidrset = defaultdict(set) | |
for pl in prefix_lists: | |
name2cidrset[pl.name].add( tuple(sorted(pl.prefixes)) ) | |
# Generate a list of prefix-list names that non-identical contents | |
rv = [pl_name for pl_name, cidr_set in name2cidrset.items() if len(cidr_set) > 1] | |
# Done | |
return rv | |
def parse_router_config(rtr_xml_file): | |
"""Parse a router config. | |
Returns a list of PrefixLists, and a list of BGPNeighbors objects. | |
Note that both datastructures do track the router name. | |
""" | |
config_root = et.parse(rtr_xml_file) | |
# Get Hostname | |
rtr_name = extract_hostname(config_root) | |
# Get Prefix lists | |
prefix_lists = extract_prefix_lists(config_root, rtr_name) | |
# Snag each routing instance + BGP config | |
neighbors = extract_neighbors(config_root, rtr_name) | |
return prefix_lists, neighbors | |
RE_HOSTNAME_CLEANUP = re.compile(r'-re\d', re.IGNORECASE) | |
def extract_hostname(config_root): | |
"""Return the sanitized router hostname""" | |
rtr_name = config_root.findtext('.//system/host-name') | |
rtr_name = RE_HOSTNAME_CLEANUP.sub('', rtr_name) | |
return rtr_name | |
def extract_neighbors(config_root, rtr_name): | |
""" | |
Find all BGP neighbors. | |
Correlate which PrefixLists they reference. | |
Return a list of BGPNeighbor objects | |
""" | |
# Get policy -> prefix-list map | |
# (policy_name) => [(prefix-list-name, orlonger)] | |
prefix_references = extract_prefix_references(config_root) | |
neighbors = [] | |
routing_instances = [] # (name, bgp_config_xml), ... | |
# Populate routing instances - Global instance | |
root_bgp = config_root.find('.//configuration/protocols/bgp') | |
assert root_bgp is not None | |
routing_instances.append( (None, root_bgp) ) | |
# Populate routing instances - VRF | |
for ri in config_root.findall('.//configuration/routing-instances/instance'): | |
ri_name = ri.findtext('./name') | |
ri_bgp = ri.find('./protocols/bgp') | |
if ri_bgp is None: | |
continue | |
routing_instances.append( (ri_name, ri_bgp) ) | |
# Loop through Instance + Group + Neigh bor | |
# LIMITATION - this doesn't handle neighbors that exist outside of groups | |
for ri_name, ri_bgp in routing_instances: | |
for grp in ri_bgp.findall('./group'): | |
grp_imports = [imp.text for imp in grp.findall("./import")] | |
for ne in grp.findall("./neighbor"): | |
# Skip inactive neighbors | |
if "inactive" in ne.attrib: | |
continue | |
# Figure out neighbor's imports | |
ne_name = ne.findtext("./name") | |
ne_description = ne.findtext("./description") | |
ne_as = ne.findtext("./peer-as") | |
ne_imports = [imp.text for imp in ne.findall("./import")] | |
if len(ne_imports) == 0: | |
ne_imports = grp_imports | |
# Map import statements to prefix lists | |
prefix_lists = [] | |
for imp in ne_imports: | |
prefix_lists.extend(prefix_references[imp]) | |
neighbors.append( BGPNeighbor( | |
rtr_name, ri_name, | |
ne_as, ne_name, ne_description, | |
prefix_lists, | |
) ) | |
return neighbors | |
def extract_prefix_references(config_root): | |
""" | |
Find routing policies and which prefix lists they reference. | |
Return: dict with "policy-name" => [PrefixListRef1, PrefixListRef2, ...] | |
""" | |
prefix_references = {} # (policy_name) => [(prefix-list-name, orlonger), ...] | |
for ps in config_root.findall(".//configuration/policy-options/policy-statement"): | |
ps_name = ps.findtext("./name") | |
prefix_refs = [] # [(prefix-list-name, orlonger), ... ] | |
# Loop through prefix references in terms | |
for plf in ps.findall('./term/from/prefix-list-filter'): | |
pl_name = plf.findtext('./list_name') | |
orlonger = (plf.find('./orlonger') is not None) | |
prefix_refs.append( PrefixListRef(pl_name, orlonger) ) | |
prefix_references[ps_name] = prefix_refs | |
return prefix_references | |
# A PrefixList from JunOS Config | |
def extract_prefix_lists(config_root, rtr_name): | |
""" | |
Extracts all prefix-lists from JunOS XML Config. | |
Returns a list of Named | |
""" | |
rv = [] | |
for pl in config_root.findall('./configuration/policy-options/prefix-list'): | |
# Skip prefix-lists built using apply-path | |
if pl.find('./apply-path') is not None: | |
continue | |
# Extract prefixes | |
plist_name = pl.findtext('./name') | |
prefixes = [p.text for p in pl.findall('./prefix-list-item/name')] | |
# Append to RV | |
rv.append(PrefixList( | |
rtr_name, plist_name, prefixes | |
)) | |
# Return | |
return rv | |
# Main program | |
if __name__ == "__main__": | |
main() |