Source code for morse.multinode.hla

import logging; logger = logging.getLogger("morse." + __name__)
import mathutils
import os

from morse.core.exceptions import MorseMultinodeError
from morse.core.multinode import SimulationNodeClass

logger.setLevel(logging.INFO)

try:
    import hla
    import hla.rti as rti
    import hla.omt as fom
except (ImportError, SyntaxError):
    logger.error("No HLA binding found or imported!")
    raise MorseMultinodeError("'hla' python not found, or has syntax errors")

"""
Defines the 'MorseVector' type that will be transfered on the HLA federation.
"""
MorseVector = fom.HLAfixedArray("MorseVector", fom.HLAfloat32LE, 3)

[docs]class MorseAmbassador(rti.FederateAmbassador): """ The Federate Ambassador of the MORSE node. """ def __init__(self, rtia, federation, time_regulation, time, gl): self.objects = [] self.rtia_ = rtia self.constrained = False self.regulator = False self.tag = time_regulation self.federation = federation self.current_time = time self.lookahead = 0 self.gl = gl logger.debug("MorseAmbassador created.")
[docs] def initialize(self): try: self.out_robot = self.rtia_.getObjectClassHandle("Robot") self.out_position = self.rtia_.getAttributeHandle("position", self.out_robot) self.out_orientation = self.rtia_.getAttributeHandle( "orientation", self.out_robot) except rti.NameNotFound: logger.error("'Robot' (or attributes) not declared in FOM." + \ "Your '.fed' file may not be up-to-date.") return False self.rtia_.publishObjectClass(self.out_robot, [self.out_position, self.out_orientation]) for obj, local_robot_data in self.gl.robotDict.items(): self.objects.append(self.rtia_.registerObjectInstance( self.out_robot, obj.name)) logger.info( "Pose of robot %s will be published on the %s federation.", obj.name, self.federation) self.in_robot = self.rtia_.getObjectClassHandle("Robot") self.in_position = self.rtia_.getAttributeHandle("position", self.in_robot) self.in_orientation = self.rtia_.getAttributeHandle("orientation", self.in_robot) self.rtia_.subscribeObjectClassAttributes(self.in_robot, [self.in_position, self.in_orientation]) # TSO initialization if self.tag: self.tag = False self.lookahead = 1 self.current_time = self.rtia_.queryFederateTime() logger.debug("Initial Federate time is %s", self.current_time) self.rtia_.enableTimeConstrained() self.rtia_.enableTimeRegulation(self.current_time, self.lookahead) while (not self.constrained and not self.regulator and not self.tag): self.rtia_.tick(0, 1) logger.debug("MorseAmbassador initialized")
[docs] def terminate(self): for obj in self.objects: self.rtia_.deleteObjectInstance(obj, self.rtia_.getObjectInstanceName(obj))
[docs] def discoverObjectInstance(self, object, objectclass, name): logger.info( "Robot %s will have its pose reflected in the current node...", name)
[docs] def reflectAttributeValues(self, object, attributes, tag, order, transport, time=None, retraction=None): scene = self.gl.getCurrentScene() obj_name = self.rtia_.getObjectInstanceName(object) logger.debug("RAV %s", obj_name) try: obj = scene.objects[obj_name] if self.in_position in attributes: pos, offset = MorseVector.unpack(attributes[self.in_position]) # Update the positions of the robots obj.worldPosition = pos if self.in_orientation in attributes: ori, offset = MorseVector.unpack(attributes[self.in_orientation]) # Update the orientations of the robots obj.worldOrientation = mathutils.Euler(ori).to_matrix() except KeyError as detail: logger.debug("Robot %s not found in this simulation scenario," + \ "but present in another node. Ignoring it!", obj_name)
[docs] def timeConstrainedEnabled(self, time): logger.debug("Constrained at time %s", time) self.current_time = time self.constrained = True
[docs] def timeRegulationEnabled(self, time): logger.debug("Regulator at time %s", time) self.current_time = time self.regulator = True
[docs] def timeAdvanceGrant(self, time): self.current_time = time self.tag = True
[docs]class HLANode(SimulationNodeClass): """ Implements multinode simulation using HLA. """ time_sync = False fom = "morse.fed" federation = "MORSE"
[docs] def initialize(self): """ Initializes HLA (connection to RTIg, FOM file, publish robots...) """ logger.info("Initializing HLA node.") if os.getenv("CERTI_HTTP_PROXY") == None: os.environ["CERTI_HTTP_PROXY"] = "" os.environ["CERTI_HOST"] = str(self.host) os.environ["CERTI_TCP_PORT"] = str(self.port) logger.debug("CERTI_HTTP_PROXY= %s", os.environ["CERTI_HTTP_PROXY"]) logger.debug("CERTI_HOST= %s", os.environ["CERTI_HOST"]) logger.debug("CERTI_TCP_PORT= %s", os.environ["CERTI_TCP_PORT"]) try: logger.debug("Creating RTIA...") self.rtia = rti.RTIAmbassador() logger.debug("RTIA created!") try: self.rtia.createFederationExecution(self.federation, self.fom) logger.info("%s federation created", self.federation) except rti.FederationExecutionAlreadyExists: logger.debug("%s federation already exists", self.federation) except rti.CouldNotOpenFED: logger.error("FED file not found! " + \ "Please check that the '.fed' file is in the CERTI " + \ "search path of RTIg.") return False except rti.ErrorReadingFED: logger.error("Error when reading FED file! " + \ "Please check the '.fed' file syntax.") return False logger.debug("Creating MorseAmbassador...") self.morse_ambassador = MorseAmbassador(self.rtia, self.federation, self.time_sync, 0, self.gl) try: self.rtia.joinFederationExecution(self.node_name, self.federation, self.morse_ambassador) except rti.FederateAlreadyExecutionMember: logger.error("A Federate with name %s has already registered."+\ " Change the name of your federate or " + \ "check your federation architecture.", self.node_name) return False except rti.CouldNotOpenFED: logger.error("FED file not found! Please check that the " + \ "'.fed' file is in the CERTI search path.") return False except rti.ErrorReadingFED: logger.error("Error when reading FED file! "+ \ "Please check the '.fed' file syntax.") return False if self.morse_ambassador.initialize() == False: return False logger.info("HLA middleware initialized.") except Exception as error: logger.error("Error when connecting to the RTIg: %s." + \ "Please check your HLA network configuration.", error) raise
[docs] def finalize(self): """ Close all open HLA connections. """ logger.info("Resigning from the HLA federation") if self.morse_ambassador: self.morse_ambassador.terminate() self.rtia.resignFederationExecution( rti.ResignAction.DeleteObjectsAndReleaseAttributes)
[docs] def synchronize(self): self.morse_ambassador.tag = False scene = self.gl.getCurrentScene() t = self.morse_ambassador.current_time + self.morse_ambassador.lookahead for obj in self.morse_ambassador.objects: obj_name = self.rtia.getObjectInstanceName(obj) obj_pos = scene.objects[obj_name].worldPosition.to_tuple() obj_ori = scene.objects[obj_name].worldOrientation.to_euler() hla_att = { self.morse_ambassador.out_position: MorseVector.pack([obj_pos[0], obj_pos[1], obj_pos[2]]), self.morse_ambassador.out_orientation: MorseVector.pack([obj_ori.x, obj_ori.y, obj_ori.z])} try: self.rtia.updateAttributeValues(obj, hla_att, "update", t) except rti.InvalidFederationTime: logger.debug("Invalid time for UAV: %s; Federation time is %s", t, self.rtia.queryFederateTime()) if self.time_sync: self.rtia.timeAdvanceRequest(t) while (not self.morse_ambassador.tag): self.rtia.tick(0, 1) logger.debug("Node simulation time:" + \ self.morse_ambassador.current_time) else: self.rtia.tick()