Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
#!/usr/bin/env python3
import glob
import re
import xml.etree.ElementTree as et
import json
import logging
import csv
logger = logging.getLogger("netflow-iface")
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s')
def main():
logger.setLevel(logging.INFO)
configs = parse_config()
with open("netflow-interfaces.csv", "w") as out_file:
out_csv = csv.DictWriter(out_file, [
"router",
"name",
"vrf",
"afi",
"ipv4",
"ipv6",
"netflow",
"description",
])
out_csv.writeheader()
for rtr, root in configs.items():
# Loop routers
for interface in iterate_ifaces(root):
interface['router'] = rtr
out_csv.writerow(interface)
# print("{:<16} {:<20}".format(
# rtr, json.dumps(interface)
# ))
def iterate_ifaces(root):
# VRF Table, map interface -> VRF name
if_vrf = {}
for vrf in root.findall('.//routing-instances/instance'):
vrf_name = vrf.findtext('./name')
for interface in vrf.findall('./interface'):
if_name = interface.findtext('./name')
if_vrf[if_name] = vrf_name
# Loop through interfaces
for interface in root.findall('.//interfaces/interface'):
if_name = interface.findtext("name")
# Skip template devices
if '*' in if_name: continue
# Yield main interface
entry, if_ag = interface_params(interface, if_name, if_vrf)
yield entry
# Loop units
for unit in interface.findall("./unit"):
unit_name = unit.findtext('./name')
full_name = "{}.{}".format(if_name,unit_name)
entry, _ = interface_params(unit, full_name, if_vrf, if_ag)
yield entry
FLOW_APPLY_GROUPS = set(['INTERFACE-BACKBONE', 'INTERFACE-CONNECTOR'])
def interface_params(unit, full_name, if_vrf, parent_ag=set()):
"""Parse paraemters of an interface or unit"""
description = unit.findtext('./description')
afi = set([f.tag for f in unit.findall('./family/*')])
routed = (len(unit.findall('./family/inet')) + len(unit.findall('./family/inet6')) > 0)
ipv4_addr = unit.findtext("./family/inet/address/name")
ipv6_addr = unit.findtext("./family/inet6/address/name")
include_groups = set([g.text for g in unit.findall('./apply-groups')])
exclude_groups = set([g.text for g in unit.findall('./apply-groups-except')])
ag = include_groups.union(parent_ag).difference(exclude_groups)
flow = bool( ag.intersection(FLOW_APPLY_GROUPS) )
if "INTERFACE-BACKBONE" in ag:
afi.update(["mpls", "iso"])
entry = {
'name': full_name,
'description': description,
'ipv4': ipv4_addr,
'ipv6': ipv6_addr,
'afi': " ".join(sorted(afi)),
'vrf': if_vrf.get(full_name, "RE" if routed else None),
'netflow': flow,
}
return entry, ag
def parse_config():
"""Parse all router's config"""
rv = {}
for config_file in glob.glob("../internet2-configs/rtsw/xml/*.xml"):
# Parse XML
try:
config_root = et.parse(config_file)
hostname = extract_hostname(config_root)
# Stash for return
rv[hostname] = config_root
except et.ParseError as e:
logger.debug("Error loading router configuration", exc_info=e)
pass
return rv
RE_HOSTNAME_CLEANUP = re.compile(r'-re\d', re.IGNORECASE)
def extract_hostname(config_root):
"""Return the sanitized router hostname"""
rtr_name = config_root.findtext('.//system/host-name')
rtr_name = RE_HOSTNAME_CLEANUP.sub('', rtr_name)
return rtr_name
if __name__ == "__main__":
main()