# Copyright 2011 Canonical
# Author: Thomi Richards
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
"""Various classes for interacting with BAMF."""
from __future__ import absolute_import
import dbus
import dbus.glib
from gi.repository import Gio
from gi.repository import GLib
import os
from Xlib import display, X, protocol
from autopilot.emulators.dbus_handler import get_session_bus
from autopilot.utilities import Silence
__all__ = [
"Bamf",
"BamfApplication",
"BamfWindow",
]
_BAMF_BUS_NAME = 'org.ayatana.bamf'
_X_DISPLAY = None
def get_display():
"""Create an Xlib display object (silently) and return it."""
global _X_DISPLAY
if _X_DISPLAY is None:
with Silence():
_X_DISPLAY = display.Display()
return _X_DISPLAY
def _filter_user_visible(win):
"""Filter out non-user-visible objects.
In some cases the DBus method we need to call hasn't been registered yet,
in which case we do the safe thing and return False.
"""
try:
return win.user_visible
except dbus.DBusException:
return False
[docs]class Bamf(object):
"""High-level class for interacting with Bamf from within a test.
Use this class to inspect the state of running applications and open
windows.
"""
def __init__(self):
matcher_path = '/org/ayatana/bamf/matcher'
self.matcher_interface_name = 'org.ayatana.bamf.matcher'
self.matcher_proxy = get_session_bus().get_object(_BAMF_BUS_NAME, matcher_path)
self.matcher_interface = dbus.Interface(self.matcher_proxy, self.matcher_interface_name)
[docs] def get_running_applications(self, user_visible_only=True):
"""Get a list of the currently running applications.
If user_visible_only is True (the default), only applications
visible to the user in the switcher will be returned.
"""
apps = [BamfApplication(p) for p in self.matcher_interface.RunningApplications()]
if user_visible_only:
return filter(_filter_user_visible, apps)
return apps
[docs] def get_running_applications_by_desktop_file(self, desktop_file):
"""Return a list of applications that have the desktop file *desktop_file*.
This method may return an empty list, if no applications
are found with the specified desktop file.
"""
apps = []
for a in self.get_running_applications():
try:
if a.desktop_file == desktop_file:
apps.append(a)
except dbus.DBusException:
pass
return apps
[docs] def get_application_by_xid(self, xid):
"""Return the application that has a child with the requested xid or None."""
app_path = self.matcher_interface.ApplicationForXid(xid)
if len(app_path):
return BamfApplication(app_path)
return None
[docs] def get_open_windows(self, user_visible_only=True):
"""Get a list of currently open windows.
If *user_visible_only* is True (the default), only applications visible
to the user in the switcher will be returned.
The result is sorted to be in stacking order.
"""
windows = [BamfWindow(w) for w in self.matcher_interface.WindowStackForMonitor(-1)]
if user_visible_only:
windows = filter(_filter_user_visible, windows)
# Now sort on stacking order.
# We explicitly convert to a list from an iterator since tests frequently
# try and use len() on return values from these methods.
return list(reversed(windows))
[docs] def get_window_by_xid(self, xid):
"""Get the BamfWindow that matches the provided *xid*."""
windows = [BamfWindow(w) for w in self.matcher_interface.WindowPaths() if BamfWindow(w).x_id == xid]
return windows[0] if windows else None
[docs] def wait_until_application_is_running(self, desktop_file, timeout):
"""Wait until a given application is running.
:param string desktop_file: The name of the application desktop file.
:param integer timeout: The maximum time to wait, in seconds. *If set to
something less than 0, this method will wait forever.*
:return: true once the application is found, or false if the application
was not found until the timeout was reached.
"""
desktop_file = os.path.split(desktop_file)[1]
# python workaround since you can't assign to variables in the enclosing scope:
# see on_timeout_reached below...
found_app = [True]
# maybe the app is running already?
if len(self.get_running_applications_by_desktop_file(desktop_file)) == 0:
wait_forever = timeout < 0
gobject_loop = GLib.MainLoop()
# No, so define a callback to watch the ViewOpened signal:
def on_view_added(bamf_path, name):
if bamf_path.split('/')[-1].startswith('application'):
app = BamfApplication(bamf_path)
if desktop_file == os.path.split(app.desktop_file)[1]:
gobject_loop.quit()
# ...and one for when the user-defined timeout has been reached:
def on_timeout_reached():
gobject_loop.quit()
found_app[0] = False
return False
# need a timeout? if so, connect it:
if not wait_forever:
GLib.timeout_add(timeout * 1000, on_timeout_reached)
# connect signal handler:
get_session_bus().add_signal_receiver(on_view_added, 'ViewOpened')
# pump the gobject main loop until either the correct signal is emitted, or the
# timeout happens.
gobject_loop.run()
return found_app[0]
[docs] def launch_application(self, desktop_file, files=[], wait=True):
"""Launch an application by specifying a desktop file.
:param files: List of files to pass to the application. *Not all
apps support this.*
:type files: List of strings
.. note:: If `wait` is True, this method will wait up to 10 seconds for
the application to appear in the BAMF model.
:raises: **TypeError** on invalid *files* parameter.
:return: The Gobject process object.
"""
if type(files) is not list:
raise TypeError("files must be a list.")
proc = Gio.DesktopAppInfo.new(desktop_file)
# FIXME: second item is a GEerror
proc.launch_uris(files, None)
if wait:
self.wait_until_application_is_running(desktop_file, 10)
return proc
[docs]class BamfApplication(object):
"""Represents an application, with information as returned by Bamf.
.. important:: Don't instantiate this class yourself. instead, use the
methods as provided by the Bamf class.
:raises: **dbus.DBusException** in the case of a DBus error.
"""
def __init__(self, bamf_app_path):
self.bamf_app_path = bamf_app_path
try:
self._app_proxy = get_session_bus().get_object(_BAMF_BUS_NAME, bamf_app_path)
self._view_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.view')
self._app_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.application')
except dbus.DBusException, e:
e.message += 'bamf_app_path=%r' % (bamf_app_path)
raise
@property
[docs] def desktop_file(self):
"""Get the application desktop file.
This just returns the filename, not the full path.
If the application no longer exists, this returns an empty string.
"""
try:
return os.path.split(self._app_iface.DesktopFile())[1]
except dbus.DBusException:
return ""
@property
[docs] def name(self):
"""Get the application name.
.. note:: This may change according to the current locale. If you want a
unique string to match applications against, use the desktop_file
instead.
"""
return self._view_iface.Name()
@property
[docs] def icon(self):
"""Get the application icon.
:return: The name of the icon.
"""
return self._view_iface.Icon()
@property
[docs] def is_active(self):
"""Is the application active (i.e.- has keyboard focus)?"""
return self._view_iface.IsActive()
@property
[docs] def is_urgent(self):
"""Is the application currently signalling urgency?"""
return self._view_iface.IsUrgent()
@property
[docs] def user_visible(self):
"""Is this application visible to the user?
.. note:: Some applications (such as the panel) are hidden to the user
but will still be returned by bamf.
"""
return self._view_iface.UserVisible()
[docs] def get_windows(self):
"""Get a list of the application windows."""
return [BamfWindow(w) for w in self._view_iface.Children()]
def __repr__(self):
return "<BamfApplication '%s'>" % (self.name)
def __eq__(self, other):
return self.desktop_file == other.desktop_file
[docs]class BamfWindow(object):
"""Represents an application window, as returned by Bamf.
.. important:: Don't instantiate this class yourself. Instead, use the
appropriate methods in BamfApplication.
"""
def __init__(self, window_path):
self._bamf_win_path = window_path
self._app_proxy = get_session_bus().get_object(_BAMF_BUS_NAME, window_path)
self._window_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.window')
self._view_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.view')
self._xid = int(self._window_iface.GetXid())
self._x_root_win = get_display().screen().root
self._x_win = get_display().create_resource_object('window', self._xid)
@property
[docs] def x_id(self):
"""Get the X11 Window Id."""
return self._xid
@property
[docs] def x_win(self):
"""Get the X11 window object of the underlying window."""
return self._x_win
@property
[docs] def name(self):
"""Get the window name.
.. note:: This may change according to the current locale. If you want a
unique string to match windows against, use the x_id instead.
"""
return self._view_iface.Name()
@property
[docs] def title(self):
"""Get the window title.
This may be different from the application name.
.. note:: This may change depending on the current locale.
"""
return self._getProperty('_NET_WM_NAME')
@property
[docs] def geometry(self):
"""Get the geometry for this window.
:return: Tuple containing (x, y, width, height).
"""
# Note: MUST import these here, rather than at the top of the file. Why?
# Because sphinx imports these modules to build the API documentation,
# which in turn tries to import Gdk, which in turn fails because there's
# no DISPlAY environment set in the package builder.
from gi.repository import GdkX11
# FIXME: We need to use the gdk window here to get the real coordinates
geometry = self._x_win.get_geometry()
origin = GdkX11.X11Window.foreign_new_for_display(get_display(), self._xid).get_origin()
return (origin[0], origin[1], geometry.width, geometry.height)
@property
[docs] def is_maximized(self):
"""Is the window maximized?
Maximized in this case means both maximized vertically and
horizontally. If a window is only maximized in one direction it is not
considered maximized.
"""
win_state = self._get_window_states()
return '_NET_WM_STATE_MAXIMIZED_VERT' in win_state and \
'_NET_WM_STATE_MAXIMIZED_HORZ' in win_state
@property
[docs] def application(self):
"""Get the application that owns this window.
This method may return None if the window does not have an associated
application. The 'desktop' window is one such example.
"""
# BAMF returns a list of parents since some windows don't have an
# associated application. For these windows we return none.
parents = self._view_iface.Parents()
if parents:
return BamfApplication(parents[0])
else:
return None
@property
[docs] def user_visible(self):
"""Is this window visible to the user in the switcher?"""
return self._view_iface.UserVisible()
@property
[docs] def is_hidden(self):
"""Is this window hidden?
Windows are hidden when the 'Show Desktop' mode is activated.
"""
win_state = self._get_window_states()
return '_NET_WM_STATE_HIDDEN' in win_state
@property
[docs] def is_focused(self):
"""Is this window focused?"""
win_state = self._get_window_states()
return '_NET_WM_STATE_FOCUSED' in win_state
@property
[docs] def is_valid(self):
"""Is this window object valid?
Invalid windows are caused by windows closing during the construction of
this object instance.
"""
return not self._x_win is None
@property
[docs] def monitor(self):
"""Returns the monitor to which the windows belongs to"""
return self._window_iface.Monitor()
@property
[docs] def closed(self):
"""Returns True if the window has been closed"""
# This will return False when the window is closed and then removed from BUS
try:
return (self._window_iface.GetXid() != self.x_id)
except:
return True
[docs] def close(self):
"""Close the window."""
self._setProperty('_NET_CLOSE_WINDOW', [0, 0])
[docs] def set_focus(self):
self._x_win.set_input_focus(X.RevertToParent, X.CurrentTime)
self._x_win.configure(stack_mode=X.Above)
def __repr__(self):
return "<BamfWindow '%s' Xid: %d>" % (self.title if self._x_win else '', self.x_id)
def _getProperty(self, _type):
"""Get an X11 property.
_type is a string naming the property type. win is the X11 window object.
"""
atom = self._x_win.get_full_property(get_display().get_atom(_type), X.AnyPropertyType)
if atom:
return atom.value
def _setProperty(self, _type, data, mask=None):
if type(data) is str:
dataSize = 8
else:
# data length must be 5 - pad with 0's if it's short, truncate otherwise.
data = (data + [0] * (5 - len(data)))[:5]
dataSize = 32
ev = protocol.event.ClientMessage(window=self._x_win, client_type=get_display().get_atom(_type), data=(dataSize, data))
if not mask:
mask = (X.SubstructureRedirectMask | X.SubstructureNotifyMask)
self._x_root_win.send_event(ev, event_mask=mask)
get_display().sync()
def _get_window_states(self):
"""Return a list of strings representing the current window state."""
get_display().sync()
return map(get_display().get_atom_name, self._getProperty('_NET_WM_STATE'))