"""An object representing a Node interface"""
from .exceptions import ModelException
# from .rsvp import RSVP_LSP
from .srlg import SRLG
[docs]class Interface(object):
"""An object representing a Node's Interface"""
def __init__(
self,
name,
cost,
capacity,
node_object,
remote_node_object,
circuit_id=None,
rsvp_enabled=True,
percent_reservable_bandwidth=100,
):
self.name = name
self.cost = cost
self.capacity = capacity
self.node_object = node_object
self.remote_node_object = remote_node_object
self.circuit_id = (
circuit_id # Has no role in Model object, only in Parallel_Model_Object
)
self.traffic = 0.0
self._failed = False
self._reserved_bandwidth = 0.0
self._srlgs = set()
self.rsvp_enabled = rsvp_enabled
self.percent_reservable_bandwidth = percent_reservable_bandwidth
@property
def _key(self):
"""Unique ID for interface object"""
return (self.name, self.node_object.name)
# Modify the __hash__ and __eq__ methods to make comparisons easier
def __eq__(self, other_object):
if not isinstance(other_object, Interface):
return NotImplemented
return [
self.node_object,
self.remote_node_object,
self.name,
self.capacity,
self.circuit_id,
] == [
other_object.node_object,
other_object.remote_node_object,
other_object.name,
other_object.capacity,
other_object.circuit_id,
]
def __ne__(self, other_object):
return [
self.node_object,
self.remote_node_object,
self.name,
self.capacity,
self.circuit_id,
] != [
other_object.node_object,
other_object.remote_node_object,
other_object.name,
other_object.capacity,
other_object.circuit_id,
]
def __hash__(self):
# return hash(tuple(sorted(self.__dict__.items())))
return hash(self.name + self.node_object.name)
def __repr__(self):
return "%s(name = %r, cost = %s, capacity = %s, node_object = %r, \
remote_node_object = %r, circuit_id = %r)" % (
self.__class__.__name__,
self.name,
self.cost,
self.capacity,
self.node_object,
self.remote_node_object,
self.circuit_id,
)
@property
def reservable_bandwidth(self):
"""
Amount of bandwidth available for rsvp lsp reservation. If interface is
not rsvp_enabled, then reservable_bandwidth is set to -1
"""
if self.rsvp_enabled is True:
res_bw = (
self.capacity * (self.percent_reservable_bandwidth / 100)
) - self.reserved_bandwidth
return round(res_bw, 1)
else:
return -1.0
@property
def reserved_bandwidth(self):
"""
Amount of interface capacity reserved by RSVP LSPs
"""
return round(self._reserved_bandwidth, 1)
@reserved_bandwidth.setter
def reserved_bandwidth(self, value):
"""
Puts logical guardrails on what reserved_bandwidth value can be
:param value: value of reserved_bandwidth
:return: None
"""
if isinstance(value, float) or isinstance(value, int):
self._reserved_bandwidth = value
else:
raise ModelException(
"Interface reserved_bandwidth must be a float or integer"
)
@property
def failed(self):
"""
Is interface failed? Boolean. It is NOT recommended to directly
modify this property. Rather, use Interface.fail or Interface.unfail.
:return: Boolean - is Interface failed?
"""
return self._failed
@failed.setter
def failed(self, status):
"""
Puts logical guardrails on conditions of interface failure status.
It is NOT recommended to modify this directly. Use the Model methods
fail_interface(interface_name, node_name) and
unfail_interface(interface_name, node_name)
:param status: boolean; input by user
:return: self._failed; boolean
"""
if not (isinstance(status, bool)):
raise ModelException("must be boolean value")
# Check for membership in any failed SRLGs
if status is False:
# Check for membership in any failed SRLGs
failed_srlgs = set([srlg for srlg in self.srlgs if srlg.failed is True])
if len(failed_srlgs) > 0:
self._failed = True
self.reserved_bandwidth = 0
raise ModelException(
"Interface must be failed since it is a member "
"of one or more SRLGs that are failed"
)
# Check to see if both nodes are failed = False
if (
self.node_object.failed is False
and self.remote_node_object.failed is False
):
self._failed = False
else:
self._failed = True
self.reserved_bandwidth = 0
else:
self._failed = True
self.reserved_bandwidth = 0
@property
def cost(self):
return self._cost
@cost.setter
def cost(self, cost):
if cost < 1:
raise ModelException("Interface cost cannot be less than 1")
if not isinstance(cost, int):
raise ModelException("Interface cost must be integer")
self._cost = cost
@property
def capacity(self):
return self._capacity
@capacity.setter
def capacity(self, capacity):
if not (capacity > 0):
raise ModelException("Interface capacity must be greater than 0")
self._capacity = capacity
[docs] def fail_interface(self, model):
"""
Updates the specified interface and the remote interface
with failed==True
:param model: model object containing self
"""
# find the remote interface
remote_interface = Interface.get_remote_interface(self, model)
# set the 2 interfaces to failed = True
self.failed = True
remote_interface.failed = True
[docs] def unfail_interface(self, model):
"""
Updates the specified interface and the remote interface
with failed==False
:param model: model object containing self
"""
# find the remote interface
remote_interface = Interface.get_remote_interface(self, model)
# check to see if the local and remote node are failed
if self.node_object.failed is False and self.remote_node_object.failed is False:
# Set the 2 interfaces to failed = False
self.failed = False
remote_interface.failed = False
else:
message = "Local and/or remote node are failed; cannot have unfailed interface on failed node"
raise ModelException(message)
[docs] def get_remote_interface(self, model):
"""
Returns Interface on other side of the Circuit containing self
:param model: model object holding self
:return: Interface object on remote side of Circuit containing self
"""
for interface in (interface for interface in model.interface_objects):
if (
interface.node_object.name == self.remote_node_object.name
and interface.circuit_id == self.circuit_id
):
remote_interface = interface
break
# Sanity check
if remote_interface.remote_node_object.interfaces(
model
) == self.node_object.interfaces(model):
return remote_interface
else: # pragma: no cover
print("Interface validation debug info follows:")
print(remote_interface.remote_node_object.interfaces(model))
print(self.node_object.interfaces(model))
message = (
"Internal Validation Error {} and {} fail validation checks; did you "
"forget to run update_simulation() on the model after making a change or "
"loading a model file?".format(remote_interface, self)
)
raise ModelException(message)
[docs] def get_circuit_object(self, model):
"""
Returns the circuit object from the model that an
interface is associated with.
:param model: model object containing self
:return: Circuit object containing self
"""
ckt = model.get_circuit_object_from_interface(self.name, self.node_object.name)
return ckt
[docs] def demands(self, model):
"""
Returns list of demands that egress the interface
:param model: model object containing self
:return: list of Demand objects egressing self
"""
dmd_set = set()
routed_demands = (
demand for demand in model.demand_objects if demand.path != "Unrouted"
)
for demand in routed_demands:
for dmd_path in demand.path:
# If dmd_path is an RSVP LSP and self is in dmd_path.path['interfaces'] ,
# look at the LSP path and get demands on the LSP and add them to dmd_set
# from .rsvp import RSVP_LSP
# if isinstance(dmd_path, RSVP_LSP):
# if self in dmd_path.path['interfaces']:
# dmd_set.add(demand)
#
# # If path is not an LSP, then it's a list of Interface or LSP
# # objects; look for self in dmd_path
#
# elif self in dmd_path:
# # num_paths += 1
# dmd_set.add(demand)
from .rsvp import RSVP_LSP
for object in dmd_path:
if isinstance(object, RSVP_LSP):
if self in object.path["interfaces"]:
dmd_set.add(demand)
elif self in dmd_path:
dmd_set.add(demand)
dmd_list = list(dmd_set)
return dmd_list
[docs] def lsps(self, model):
"""
Returns a list of RSVP LSPs that egress the interface
:param model: Model object
:return: list of RSVP LSPs that egress the interface
"""
lsp_set = set()
for lsp in (
lsp for lsp in model.rsvp_lsp_objects if "Unrouted" not in lsp.path
):
if self in lsp.path["interfaces"]:
lsp_set.add(lsp)
lsp_list = list(lsp_set)
return lsp_list
@property
def utilization(self):
"""Returns utilization percent = (self.traffic/self.capacity)*100"""
if self.traffic == "Down":
return "Int is down"
else:
util = (self.traffic / self.capacity) * 100
return float("%.2f" % util)
@property
def srlgs(self):
return self._srlgs
[docs] def add_to_srlg(self, srlg_name, model, create_if_not_present=False):
"""
Adds self to an SRLG with name=srlg_name in model. Also finds the
remote Interface object and adds that to the SRLG.
:param srlg_name: name of srlg
:param model: Model object
:param create_if_not_present: Boolean. Create the SRLG if it does not exist in model already. True will create SRLG in model; False will raise ModelException # noqa E501
:return: None
"""
# See if model has existing SRLG with name='srlg_name'
# get_srlg will be the SRLG object with name=srlg_name in model
# or it will be False if the SRLG with name=srlg_name does not
# exist in model
try:
get_srlg = model.get_srlg_object(srlg_name)
except ModelException:
get_srlg = False
if get_srlg is False:
# SRLG does not exist
if create_if_not_present is True:
new_srlg = SRLG(srlg_name, model)
model.srlg_objects.add(new_srlg)
self._srlgs.add(new_srlg)
# Add remote interface
remote_int = self.get_remote_interface(model)
remote_int._srlgs.add(new_srlg)
else:
msg = "An SRLG with name {} does not exist in the Model".format(
srlg_name
)
raise ModelException(msg)
else:
# SRLG does exist in model; add self to that SRLG
get_srlg.interface_objects.add(self)
self._srlgs.add(get_srlg)
# Add remote interface
remote_int = self.get_remote_interface(model)
get_srlg.interface_objects.add(remote_int)
remote_int._srlgs.add(get_srlg)
[docs] def remove_from_srlg(self, srlg_name, model):
"""
Removes self and remote interface object from SRLG with srlg_name in model.
:param srlg_name: name of SRLG
:param model: Model object
:return: none
"""
# See if model has existing SRLG with name='srlg_name'
# get_srlg will be the SRLG object with name=srlg_name in model
# or it will be False if the SRLG with name=srlg_name does not
# exist in model
try:
get_srlg = model.get_srlg_object(srlg_name)
except ModelException:
get_srlg = False
if get_srlg is False:
msg = "An SRLG with name {} does not exist in the Model".format(srlg_name)
raise ModelException(msg)
else:
# Remove self from SRLG
get_srlg.interface_objects.remove(self)
self._srlgs.remove(get_srlg)
# Remove remote interface from SRLG
remote_int = self.get_remote_interface(model)
get_srlg.interface_objects.remove(remote_int)
remote_int._srlgs.remove(get_srlg)
self.failed = False