Source code for morse.services.supervision_services
import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service
from morse.core import status, blenderapi
from morse.blender.main import reset_objects as main_reset, close_all as main_close, quit as main_terminate
from morse.core.exceptions import *
@service(component = "simulation")
[docs]def list_robots():
""" Return a list of the robots in the current scenario
Uses the list generated during the initialisation of the scenario
"""
return [obj.name for obj in blenderapi.persistantstorage().robotDict.keys()]
@service(component = "simulation")
[docs]def reset_objects():
""" Restore all simulation objects to their original position
Upon receiving the request using sockets, call the
'reset_objects' function located in morse/blender/main.py
"""
contr = blenderapi.controller()
main_reset(contr)
return "Objects restored to initial position"
@service(component = "simulation")
[docs]def quit():
""" Cleanly quit the simulation
"""
contr = blenderapi.controller()
main_close(contr)
main_terminate(contr)
@service(component = "simulation")
[docs]def terminate():
""" Terminate the simulation (no finalization done!)
"""
contr = blenderapi.controller()
main_terminate(contr)
@service(component = "simulation")
[docs]def activate(component_name):
""" Enable the functionality of the component specified
"""
try:
blenderapi.persistantstorage().componentDict[component_name]._active = True
except KeyError as detail:
logger.warn("Component %s not found. Can't activate" % detail)
raise MorseRPCTypeError("Component %s not found. Can't activate" % detail)
@service(component = "simulation")
[docs]def deactivate(component_name):
""" Stop the specified component from calling its default_action method
"""
try:
blenderapi.persistantstorage().componentDict[component_name]._active = False
except KeyError as detail:
logger.warn("Component %s not found. Can't deactivate" % detail)
raise MorseRPCTypeError("Component %s not found. Can't deactivate" % detail)
@service(component = "simulation")
[docs]def suspend_dynamics():
""" Suspends physics for all object in the scene.
"""
scene = blenderapi.scene()
for object in scene.objects:
object.suspendDynamics()
return "Physics is suspended"
@service(component = "simulation")
[docs]def restore_dynamics():
""" Resumes physics for all object in the scene.
"""
scene = blenderapi.scene()
for object in scene.objects:
object.restoreDynamics()
return "Physics is resumed"
@service(component = "simulation")
[docs]def details():
"""Returns a structure containing all possible details
about the simulation currently running, including
the list of robots, the list of services and datastreams,
the list of middleware in use, etc.
"""
simu = blenderapi.persistantstorage()
details = {}
# Retrieves the list of services and associated middlewares
services = {}
services_iface = {}
for n, i in simu.morse_services.request_managers().items():
services.update(i.services())
for cmpt in i.services():
services_iface.setdefault(cmpt, []).append(n)
def cmptdetails(c):
c = simu.componentDict[c.name]
cmpt = {"type": type(c).__name__,}
if c.name() in services:
cmpt["services"] = services[c.name()]
cmpt["service_interfaces"] = services_iface[c.name()]
if c.name() in simu.datastreams:
stream = simu.datastreams[c.name()]
cmpt["stream"] = stream[0]
cmpt["stream_interfaces"] = stream[1]
return cmpt
def robotdetails(r):
robot = {"name": r.name(),
"type": type(r).__name__,
"components": {c.name:cmptdetails(c) for c in r.components}
}
return robot
for n, i in simu.datastreamDict.items():
pass
details['robots'] = [robotdetails(r) for n, r in simu.robotDict.items()]
return details