# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
# Copyright 2012 Canonical
# Author: Thomi Richards
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
"""Package for introspection support."""
import dbus
from gi.repository import Gio
import logging
import subprocess
from testtools.content import text_content
from time import sleep
import os
import signal
from autopilot.introspection.constants import (
AUTOPILOT_PATH,
QT_AUTOPILOT_IFACE,
AP_INTROSPECTION_IFACE,
DBUS_INTROSPECTION_IFACE,
)
from autopilot.introspection.dbus import (
clear_object_registry,
DBusIntrospectionObject,
object_passes_filters,
get_session_bus,
)
from autopilot.utilities import get_debug_logger
logger = logging.getLogger(__name__)
[docs]class ApplicationIntrospectionTestMixin(object):
"""A mix-in class to make launching applications for introsection easier.
.. important:: You should not instantiate this class directly. Instead, use
one of the derived classes.
"""
[docs] def launch_test_application(self, application, *arguments, **kwargs):
"""Launch *application* and retrieve a proxy object for the application.
Use this method to launch a supported application and start testing it.
The application can be specified as:
* A Desktop file, either with or without a path component.
* An executable file, either with a path, or one that is in the $PATH.
This method supports the following keyword arguments:
* *launch_dir*. If set to a directory that exists the process will be
launched from that directory.
* *capture_output*. If set to True (the default), the process output
will be captured and attached to the test as test detail.
:raises: **ValueError** if unknown keyword arguments are passed.
:return: A proxy object that represents the application. Introspection
data is retrievable via this object.
"""
if not isinstance(application, basestring):
raise TypeError("'application' parameter must be a string.")
cwd = kwargs.pop('launch_dir', None)
capture_output = kwargs.pop('capture_output', True)
if kwargs:
raise ValueError("Unknown keyword arguments: %s." %
(', '.join( repr(k) for k in kwargs.keys())))
if application.endswith('.desktop'):
proc = Gio.DesktopAppInfo.new(application)
application = proc.get_executable()
path, args = self.prepare_environment(application, list(arguments))
process = launch_autopilot_enabled_process(path,
args,
capture_output,
cwd=cwd)
self.addCleanup(self._kill_process_and_attach_logs, process)
return get_autopilot_proxy_object_for_process(process)
[docs] def prepare_environment(self, app_path, arguments):
"""Prepare the application, or environment to launch with autopilot-support.
This method does nothing - it exists so child classes can override it.
The method *must* return a tuple of (*app_path*, *arguments*). Either of
these can be altered by this method.
"""
raise NotImplementedError("Sub-classes must implement this method.")
def _kill_process_and_attach_logs(self, process):
process.kill()
logger.info("waiting for process to exit.")
for i in range(10):
if process.returncode is not None:
break
if i == 9:
logger.info("Terminating process group, since it hasn't exited after 10 seconds.")
os.killpg(process.pid, signal.SIGTERM)
sleep(1)
stdout, stderr = process.communicate()
self.addDetail('process-stdout', text_content(stdout))
self.addDetail('process-stderr', text_content(stderr))
[docs]def launch_autopilot_enabled_process(application, args, capture_output, **kwargs):
"""Launch an autopilot-enabled process and return the proxy object."""
commandline = [application]
commandline.extend(args)
logger.info("Launching process: %r", commandline)
cap_mode = None
if capture_output:
cap_mode = subprocess.PIPE
process = subprocess.Popen(commandline,
stdin=subprocess.PIPE,
stdout=cap_mode,
stderr=cap_mode,
close_fds=True,
preexec_fn=os.setsid,
**kwargs)
return process
[docs]def get_child_pids(pid):
"""Get a list of all child process Ids, for the given parent.
"""
def get_children(pid):
command = ['ps', '-o', 'pid', '--ppid', str(pid), '--noheaders']
try:
raw_output = subprocess.check_output(command)
except subprocess.CalledProcessError:
return []
return [int(p) for p in raw_output.split()]
result = [pid]
data = get_children(pid)
while data:
pid = data.pop(0)
result.append(pid)
data.extend(get_children(pid))
return result
[docs]def get_autopilot_proxy_object_for_process(process):
"""Return the autopilot proxy object for the given *process*.
:raises: **RuntimeError** if no autopilot interface was found.
"""
pid = process.pid
#
# FIXME: Currently the libindicate python bindings provide no way of
# getting a server property. Instead, the only thing we can do is to
# iterate over every service in the session bus, grab the ones that
# match the process id passed to us, and look for the autopilot interface
# manually.
#
# This sucks, and should be changed to something more elegant in the future.
session_bus = get_session_bus()
bus_object = session_bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
bus_iface = dbus.Interface(bus_object, 'org.freedesktop.DBus')
# clear the object registry, since it's specific to the dbus service, and we
# have just started a new service. We don't want the old types hanging around
# in the registry. We need a better method for this however.
clear_object_registry()
logger.info("Looking for autopilot interface for PID %d (and children)", pid)
# We give the process 10 seconds grace time to get the dbus interface up...
for i in range(10):
eligible_pids = get_child_pids(pid)
get_debug_logger().debug("Searching for eligible PIDs: %r", eligible_pids)
names = session_bus.list_names()
for name in names:
try:
name_pid = bus_iface.GetConnectionUnixProcessID(name)
if name_pid in eligible_pids:
# We've found at least one connection to the session bus from
# this PID. Might not be the one we want however...
proxy = make_proxy_object_from_service_name(name, AUTOPILOT_PATH)
proxy.set_process(process)
return proxy
except Exception as e:
logger.warning("Caught exception while searching for autopilot interface: '%r'", e)
sleep(1)
raise RuntimeError("Unable to find Autopilot interface.")
[docs]def make_proxy_object_from_service_name(service_name, obj_path):
"""Returns a root proxy object given a DBus service name."""
# parameters can sometimes be dbus.String instances, sometimes QString instances.
# it's easier to convert them here than at the calling sites.
service_name = str(service_name)
obj_path = str(obj_path)
proxy_bases = get_proxy_object_base_clases(service_name, obj_path)
cls_name, cls_state = get_proxy_object_class_name_and_state(service_name, obj_path)
clsobj = type(str(cls_name),
proxy_bases,
dict(DBUS_SERVICE=service_name,
DBUS_OBJECT=obj_path
)
)
proxy = clsobj.get_root_instance()
return proxy
[docs]def get_proxy_object_base_clases(service_name, obj_path):
"""Return tuple of the base classes to use when creating a proxy object
for the given service name & path.
:raises: **RuntimeError** if the autopilot interface cannot be found.
"""
bases = [ApplicationProxyObect]
dbus_object = get_session_bus().get_object(service_name, obj_path)
introspection_iface = dbus.Interface(dbus_object, DBUS_INTROSPECTION_IFACE)
intro_xml = introspection_iface.Introspect()
if AP_INTROSPECTION_IFACE not in intro_xml:
raise RuntimeError("Could not find Autopilot interface on service name '%s'" % service_name)
if QT_AUTOPILOT_IFACE in intro_xml:
from autopilot.introspection.qt import QtObjectProxyMixin
bases.append(QtObjectProxyMixin)
return tuple(bases)
[docs]def get_proxy_object_class_name_and_state(service_name, obj_path):
"""Return the class name and root state dictionary."""
dbus_object = get_session_bus().get_object(service_name, obj_path)
dbus_iface = dbus.Interface(dbus_object, AP_INTROSPECTION_IFACE)
return dbus_iface.GetState("/")[0]
[docs]class ApplicationProxyObect(DBusIntrospectionObject):
"""A class that better supports query data from an application."""
def __init__(self, state, path_info=None):
super(ApplicationProxyObect, self).__init__(state, path_info)
self._process = None
[docs] def select_single(self, type_name='*', **kwargs):
"""Get a single node from the introspection tree, with type equal to
*type_name* and (optionally) matching the keyword filters present in
*kwargs*.
For example:
>>> app.select_single('QPushButton', objectName='clickme')
... returns a QPushButton whose 'objectName' property is 'clickme'.
If nothing is returned from the query, this method returns None.
:raises: **ValueError** if the query returns more than one item. *If you
want more than one item, use select_many instead*.
"""
instances = self.select_many(type_name, **kwargs)
if len(instances) > 1:
raise ValueError("More than one item was returned for query")
if not instances:
return None
return instances[0]
[docs] def select_many(self, type_name='*', **kwargs):
"""Get a list of nodes from the introspection tree, with type equal to
*type_name* and (optionally) matching the keyword filters present in
*kwargs*.
For example:
>>> app.select_many('QPushButton', enabled=True)
... returns a list of QPushButtons that are enabled.
If you only want to get one item, use select_single instead.
"""
logger.debug("Selecting objects of %s with attributes: %r",
'any type' if type_name == '*' else 'type ' + type_name,
kwargs)
path = "//%s" % type_name
state_dicts = self.get_state_by_path(path)
instances = [self.make_introspection_object(i) for i in state_dicts]
return filter(lambda i: object_passes_filters(i, **kwargs), instances)
[docs] def set_process(self, process):
"""Set the subprocess.Popen object of the process that this is a proxy for.
You should never normally need to call this method.
"""
self._process = process
@property
[docs] def pid(self):
return self._process.pid
@property
[docs] def process(self):
return self._process
[docs] def kill_application(self):
"""Kill the running process that this is a proxy for using 'kill `pid`'."""
subprocess.call(["kill", "%d" % self._process.pid])