Source code for autopilot.introspection

# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
#
# Autopilot Functional Test Tool
# Copyright (C) 2012-2013 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#


"""Package for introspection support.

This package contains the internal implementation of the autopilot
introspection mechanism, and probably isn't useful to most test authors.

"""

from __future__ import absolute_import

from dbus import DBusException, Interface
import logging
import subprocess
from time import sleep
from functools import partial
import os
import psutil
import six

from autopilot.introspection.backends import DBusAddress
from autopilot.introspection.constants import (
    AUTOPILOT_PATH,
    QT_AUTOPILOT_IFACE,
    AP_INTROSPECTION_IFACE,
)
from autopilot.introspection.dbus import (
    CustomEmulatorBase,
    DBusIntrospectionObject,
    get_classname_from_path,
)
from autopilot.introspection.utilities import (
    _get_bus_connections_pid,
    _pid_is_running,
)
from autopilot.dbus_handler import (
    get_session_bus,
    get_system_bus,
    get_custom_bus,
)


logger = logging.getLogger(__name__)

# Keep track of known connections during search
connection_list = []


class ProcessSearchError(RuntimeError):
    pass


def get_application_launcher(app_path):
    """Return an instance of :class:`ApplicationLauncher` that knows how to
    launch the application at 'app_path'.
    """
    # TODO: this is a teeny bit hacky - we call ldd to check whether this
    # application links to certain library. We're assuming that linking to
    # libQt* or libGtk* means the application is introspectable. This excludes
    # any non-dynamically linked executables, which we may need to fix further
    # down the line.
    try:
        ldd_output = subprocess.check_output(
            ["ldd", app_path],
            universal_newlines=True
        ).strip().lower()
    except subprocess.CalledProcessError as e:
        raise RuntimeError(e)
    if 'libqtcore' in ldd_output or 'libqt5core' in ldd_output:
        from autopilot.introspection.qt import QtApplicationLauncher
        return QtApplicationLauncher()
    elif 'libgtk' in ldd_output:
        from autopilot.introspection.gtk import GtkApplicationLauncher
        return GtkApplicationLauncher()
    return None


def get_application_launcher_from_string_hint(hint):
    """Return in instance of :class:`ApplicationLauncher` given a string
    hint."""
    from autopilot.introspection.qt import QtApplicationLauncher
    from autopilot.introspection.gtk import GtkApplicationLauncher

    hint = hint.lower()
    if hint == 'qt':
        return QtApplicationLauncher()
    elif hint == 'gtk':
        return GtkApplicationLauncher()
    return None


def launch_application(launcher, application, *arguments, **kwargs):
    """Launch an application, and return a process object.

    :param launcher: An instance of the :class:`ApplicationLauncher` class to
        prepare the environment before launching the application itself.
    """

    if not isinstance(application, six.string_types):
        raise TypeError("'application' parameter must be a string.")
    cwd = kwargs.pop('launch_dir', None)
    capture_output = kwargs.pop('capture_output', True)
    if kwargs:
        arglist = [repr(k) for k in kwargs.keys()]
        arglist.sort()
        raise ValueError(
            "Unknown keyword arguments: %s." %
            (', '.join(arglist)))

    path, args = launcher.prepare_environment(application, list(arguments))

    process = launch_process(
        path,
        args,
        capture_output,
        cwd=cwd
    )
    return process


class ApplicationLauncher(object):
    """A class that knows how to launch an application with a certain type of
    introspection enabled.

    """

    def prepare_environment(self, app_path, arguments):
        """Prepare the application, or environment to launch with
        autopilot-support.

        This method does nothing - it exists so child classes can override it.

        The method *must* return a tuple of (*app_path*, *arguments*). Either
        of these can be altered by this method.

        """
        raise NotImplementedError("Sub-classes must implement this method.")


def launch_process(application, args, capture_output, **kwargs):
    """Launch an autopilot-enabled process and return the process object."""
    commandline = [application]
    commandline.extend(args)
    logger.info("Launching process: %r", commandline)
    cap_mode = None
    if capture_output:
        cap_mode = subprocess.PIPE
    process = subprocess.Popen(
        commandline,
        stdin=subprocess.PIPE,
        stdout=cap_mode,
        stderr=cap_mode,
        close_fds=True,
        preexec_fn=os.setsid,
        universal_newlines=True,
        **kwargs
    )
    return process


def get_autopilot_proxy_object_for_process(
    process,
    emulator_base,
    dbus_bus='session'
):
    """Return the autopilot proxy object for the given *process*.

    :raises: **RuntimeError** if no autopilot interface was found.

    """
    pid = process.pid
    proxy_obj = get_proxy_object_for_existing_process(
        pid,
        process=process,
        emulator_base=emulator_base,
        dbus_bus=dbus_bus,
    )
    proxy_obj.set_process(process)

    return proxy_obj


[docs]def get_proxy_object_for_existing_process( pid=None, dbus_bus='session', connection_name=None, process=None, object_path=AUTOPILOT_PATH, application_name=None, emulator_base=None): """Return a single proxy object for an application that is already running (i.e. launched outside of Autopilot). Searches on the given bus (supplied by **dbus_bus**) for an application matching the search criteria, creating the proxy object using the supplied custom emulator **emulator_base** (defaults to None). For example for an application on the system bus where the applications PID is known:: app_proxy = get_proxy_object_for_existing_process(pid=app_pid) Multiple criteria are allowed, for instance you could search on **pid** and **connection_name**:: app_proxy = get_proxy_object_for_existing_process( pid=app_pid, connection_name='org.gnome.gedit') If the application from the previous example was on the system bus:: app_proxy = get_proxy_object_for_existing_process( dbus_bus='system', pid=app_pid, connection_name='org.gnome.gedit') It is possible to search for the application given just the applications name. An example for an application running on a custom bus searching using the applications name:: app_proxy = get_proxy_object_for_existing_process( application_name='qmlscene', dbus_bus='unix:abstract=/tmp/dbus-IgothuMHNk') :param pid: The PID of the application to search for. :param dbus_bus: A string containing either 'session', 'system' or the custom buses name (i.e. 'unix:abstract=/tmp/dbus-IgothuMHNk'). :param connection_name: A string containing the DBus connection name to use with the search criteria. :param object_path: A string containing the object path to use as the search criteria. Defaults to :py:data:`autopilot.introspection.constants.AUTOPILOT_PATH`. :param application_name: A string containing the applications name to search for. :param emulator_base: The custom emulator to create the resulting proxy object with. :raises ProcessSearchError: if no search criteria match. :raises RuntimeError: if the search criteria results in many matches. :raises RuntimeError: if both ``process`` and ``pid`` are supplied, but ``process.pid != pid``. """ if process is not None: if pid is None: pid = process.pid elif pid != process.pid: raise RuntimeError("Supplied PID and process.pid do not match.") if pid is not None and not _pid_is_running(pid): raise ProcessSearchError("PID %d could not be found" % pid) dbus_addresses = _get_dbus_addresses_from_search_parameters( pid, dbus_bus, connection_name, object_path, process ) if application_name: app_name_check_fn = lambda i: get_classname_from_path( i.introspection_iface.GetState('/')[0][0]) == application_name dbus_addresses = list(filter(app_name_check_fn, dbus_addresses)) if dbus_addresses is None or len(dbus_addresses) == 0: raise ProcessSearchError("Search criteria returned no results") if len(dbus_addresses) > 1: raise RuntimeError("Search criteria returned multiple results") return _make_proxy_object(dbus_addresses[0], emulator_base)
def _get_dbus_addresses_from_search_parameters( pid, dbus_bus, connection_name, object_path, process): """Returns a list of :py:class: `DBusAddress` for all successfully matched criteria. """ _reset_known_connection_list() for i in range(10): _get_child_pids.reset_cache() if process is not None and not _process_is_running(process): return_code = process.poll() raise ProcessSearchError( "Process exited with exit code: %d" % return_code ) bus = _get_dbus_bus_from_string(dbus_bus) valid_connections = _search_for_valid_connections( pid, bus, connection_name, object_path ) if len(valid_connections) >= 1: return [_get_dbus_address_object(name, object_path, bus) for name in valid_connections] sleep(1) return [] def _reset_known_connection_list(): global connection_list del connection_list[:] def _search_for_valid_connections(pid, bus, connection_name, object_path): global connection_list def _get_unchecked_connections(all_connections): return list(set(all_connections).difference(set(connection_list))) possible_connections = _get_possible_connections(bus, connection_name) connection_list = _get_unchecked_connections(possible_connections) valid_connections = _get_valid_connections( connection_list, bus, pid, object_path ) return valid_connections def _process_is_running(process): return process.poll() is None def _get_valid_connections(connections, bus, pid, object_path): filter_fn = partial(_match_connection, bus, pid, object_path) valid_connections = filter(filter_fn, connections) unique_connections = _dedupe_connections_on_pid(valid_connections, bus) return unique_connections def _dedupe_connections_on_pid(valid_connections, bus): seen_pids = [] deduped_connections = [] for connection in valid_connections: pid = _get_bus_connections_pid(bus, connection) if pid not in seen_pids: seen_pids.append(pid) deduped_connections.append(connection) return deduped_connections def _get_dbus_address_object(connection_name, object_path, bus): return DBusAddress(bus, connection_name, object_path) def _get_dbus_bus_from_string(dbus_string): if dbus_string == 'session': return get_session_bus() elif dbus_string == 'system': return get_system_bus() else: return get_custom_bus(dbus_string) def _get_possible_connections(bus, connection_name): all_connection_names = bus.list_names() if connection_name is None: return all_connection_names else: matching_connections = [ c for c in all_connection_names if c == connection_name] return matching_connections def _match_connection(bus, pid, path, connection_name): """Does the connection match our search criteria?""" success = True if pid is not None: success = _connection_matches_pid(bus, connection_name, pid) if success: success = _connection_has_path(bus, connection_name, path) return success def _connection_matches_pid(bus, connection_name, pid): """Given a PID checks wherever it or its children are connected on this bus. """ if connection_name == 'org.freedesktop.DBus': return False try: if _bus_pid_is_our_pid(bus, connection_name, pid): return False bus_pid = _get_bus_connections_pid(bus, connection_name) except DBusException as e: logger.info( "dbus.DBusException while attempting to get PID for %s: %r" % (connection_name, e)) return False eligible_pids = [pid] + _get_child_pids(pid) return bus_pid in eligible_pids def _bus_pid_is_our_pid(bus, connection_name, pid): """Returns True if this scripts pid is the bus connections pid supplied.""" bus_pid = _get_bus_connections_pid(bus, connection_name) return bus_pid == os.getpid() def _connection_has_path(bus, connection_name, path): """Ensure the connection has the path that we expect to be there.""" try: _check_connection_has_ap_interface(bus, connection_name, path) return True except DBusException: return False def _check_connection_has_ap_interface(bus, connection_name, path): """Simple check if a bus with connection + path provide the Autopilot Introspection Interface. :raises: **DBusException** if it does not. """ obj = bus.get_object(connection_name, path) obj_iface = Interface(obj, 'com.canonical.Autopilot.Introspection') obj_iface.GetVersion() class _cached_get_child_pids(object): """Get a list of all child process Ids, for the given parent. Since we call this often, and it's a very expensive call, we optimise this such that the return value will be cached for each scan through the dbus bus. Calling reset_cache() at the end of each dbus scan will ensure that you get fresh values on the next call. """ def __init__(self): self._cached_result = None def __call__(self, pid): if self._cached_result is None: self._cached_result = [ p.pid for p in psutil.Process(pid).get_children(recursive=True) ] return self._cached_result def reset_cache(self): self._cached_result = None _get_child_pids = _cached_get_child_pids() def _make_proxy_object(data_source, emulator_base): """Returns a root proxy object given a DBus service name.""" proxy_bases = _get_proxy_object_base_classes(data_source) if emulator_base is None: emulator_base = type('DefaultEmulatorBase', (CustomEmulatorBase,), {}) proxy_bases = proxy_bases + (emulator_base, ) cls_name, cls_state = _get_proxy_object_class_name_and_state(data_source) # Merge the object hierarchy. clsobj = type(str("%sBase" % cls_name), proxy_bases, {}) proxy_class = type(str(cls_name), (clsobj,), {}) try: dbus_tuple = data_source.introspection_iface.GetState("/")[0] path, state = dbus_tuple return proxy_class(state, path, data_source) except IndexError: raise RuntimeError("Unable to find root object of %r" % proxy_class) def _get_proxy_object_base_classes(backend): """Return tuple of the base classes to use when creating a proxy object for the given service name & path. :raises: **RuntimeError** if the autopilot interface cannot be found. """ bases = [ApplicationProxyObject] intro_xml = backend.dbus_introspection_iface.Introspect() if AP_INTROSPECTION_IFACE not in intro_xml: raise RuntimeError( "Could not find Autopilot interface on DBus backend '%s'" % backend) if QT_AUTOPILOT_IFACE in intro_xml: from autopilot.introspection.qt import QtObjectProxyMixin bases.append(QtObjectProxyMixin) return tuple(bases) def _get_proxy_object_class_name_and_state(backend): """Return the class name and root state dictionary.""" object_path, object_state = backend.introspection_iface.GetState("/")[0] return get_classname_from_path(object_path), object_state class ApplicationProxyObject(DBusIntrospectionObject): """A class that better supports query data from an application.""" def __init__(self, state, path, backend): super(ApplicationProxyObject, self).__init__(state, path, backend) self._process = None def set_process(self, process): """Set the subprocess.Popen object of the process that this is a proxy for. You should never normally need to call this method. """ self._process = process @property def pid(self): return self._process.pid @property def process(self): return self._process def kill_application(self): """Kill the running process that this is a proxy for using 'kill `pid`'.""" subprocess.call(["kill", "%d" % self._process.pid])