Package x2go :: Module rforward
[frames] | no frames]

Source Code for Module x2go.rforward

  1  #!/usr/bin/env python 
  2   
  3  # Copyright (C) 2010-2011 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de> 
  4  # 
  5  # Python X2go is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 3 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # Python X2go is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License 
 16  # along with this program; if not, write to the 
 17  # Free Software Foundation, Inc., 
 18  # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. 
 19   
 20  """\ 
 21  X2go reverse SSH/Paramiko tunneling provides X2go sound, X2go printing and 
 22  X2go sshfs for folder sharing and mounting remote devices in X2go terminal 
 23  server sessions. 
 24   
 25  """ 
 26  __NAME__ = 'x2gorevtunnel-pylib' 
 27   
 28  # modules 
 29  import copy 
 30  import threading 
 31  import gevent 
 32  import paramiko 
 33   
 34  # gevent/greenlet 
 35  from gevent import select, socket, Timeout 
 36  from gevent import sleep as gevent_sleep 
 37   
 38  # Python X2go modules 
 39  import log 
 40   
 41   
42 -def x2go_transport_tcp_handler(chan, (origin_addr, origin_port), (server_addr, server_port)):
43 """\ 44 An X2go customized TCP handler for the Paramiko/SSH C{Transport()} class. 45 46 Incoming channels will be put into Paramiko's default accept queue. This corresponds to 47 the default behaviour of Paramiko's C{Transport} class. 48 49 However, additionally this handler function checks the server port of the incoming channel 50 and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming 51 channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an L{X2goSession} instance 52 (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests). 53 54 If the server port of an incoming Paramiko/SSH channel matches the configured port of an L{X2goRevFwTunnel} 55 instance, this instance gets notified of the incoming channel and a new L{X2goRevFwChannelThread} is 56 started. This L{X2goRevFwChannelThread} then takes care of the new channel's incoming data stream. 57 58 """ 59 transport = chan.get_transport() 60 transport._queue_incoming_channel(chan) 61 rev_tuns = transport.reverse_tunnels 62 63 for session_name in rev_tuns.keys(): 64 65 if int(server_port) in [ int(tunnel[0]) for tunnel in rev_tuns[session_name].values() ]: 66 67 if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]): 68 rev_tuns[session_name]['snd'][1].notify() 69 70 elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]): 71 rev_tuns[session_name]['sshfs'][1].notify()
72 73
74 -class X2goRevFwTunnel(threading.Thread):
75 """\ 76 L{X2goRevFwTunnel} class objects are used to reversely tunnel 77 X2go audio, X2go printing and X2go folder sharing / device mounting 78 through Paramiko/SSH. 79 80 """
81 - def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
82 """\ 83 Setup a reverse tunnel through Paramiko/SSH. 84 85 After the reverse tunnel has been setup up with L{X2goRevFwTunnel.start()} it waits 86 for notification from L{X2goRevFwTunnel.notify()} to accept incoming channels. This 87 notification (L{X2goRevFwTunnel.notify()} gets called from within the transport's 88 TCP handler function L{x2go_transport_tcp_handler} of the L{X2goSession} instance. 89 90 @param server_port: the TCP/IP port on the X2go server (starting point of the tunnel), 91 normally some number above 30000 92 @type server_port: int 93 @param remote_host: the target address for reversely tunneled traffic. With X2go this should 94 always be set to the localhost (IPv4) address. 95 @type remote_host: str 96 @param remote_port: the TCP/IP port on the X2go client (end point of the tunnel), 97 normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.) 98 @type remote_port: int 99 @param ssh_transport: the L{X2goSession}'s Paramiko/SSH transport instance 100 @type ssh_transport: C{paramiko.Transport} instance 101 @param logger: you can pass an L{X2goLogger} object to the 102 L{X2goRevFwTunnel} constructor 103 @type logger: L{X2goLogger} instance 104 @param loglevel: if no L{X2goLogger} object has been supplied a new one will be 105 constructed with the given loglevel 106 @type loglevel: int 107 108 """ 109 if logger is None: 110 self.logger = log.X2goLogger(loglevel=loglevel) 111 else: 112 self.logger = copy.deepcopy(logger) 113 self.logger.tag = __NAME__ 114 115 self.server_port = server_port 116 self.remote_host = remote_host 117 self.remote_port = remote_port 118 self.ssh_transport = ssh_transport 119 self.session_instance = session_instance 120 121 self.open_channels = {} 122 self.incoming_channel = threading.Condition() 123 124 threading.Thread.__init__(self) 125 self.daemon = True 126 self._accept_channels = True
127
128 - def __del__(self):
129 """\ 130 Class destructor. 131 132 """ 133 self.stop_thread() 134 self.cancel_port_forward('', self.server_port)
135
136 - def cancel_port_forward(self, address, port):
137 """\ 138 Cancel a port forwarding request. This cancellation request is sent to the server and 139 on the server the port forwarding should be unregistered. 140 141 @param address: remote server address 142 @type address: C{str} 143 @param port: remote port 144 @type port: C{int} 145 146 """ 147 timeout = Timeout(10) 148 timeout.start() 149 try: 150 self.ssh_transport.cancel_port_forward('', self.server_port) 151 except: 152 pass 153 finally: 154 timeout.cancel()
155
156 - def pause(self):
157 """\ 158 Prevent acceptance of new incoming connections through the Paramiko/SSH 159 reverse forwarding tunnel. Also, any active connection on this L{X2goRevFwTunnel} 160 instance will be closed immediately, if this method is called. 161 162 """ 163 if self._accept_channels == True: 164 self.cancel_port_forward('', self.server_port) 165 self._accept_channels = False 166 self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
167
168 - def resume(self):
169 """\ 170 Resume operation of the Paramiko/SSH reverse forwarding tunnel 171 and continue accepting new incoming connections. 172 173 """ 174 if self._accept_channels == False: 175 self._accept_channels = True 176 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) 177 self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
178
179 - def notify(self):
180 """\ 181 Notify an L{X2goRevFwTunnel} instance of an incoming Paramiko/SSH channel. 182 183 If an incoming reverse tunnel channel appropriate for this instance has 184 been detected, this method gets called from the L{X2goSession}'s transport 185 TCP handler. 186 187 The sent notification will trigger a C{thread.Condition()} waiting for notification 188 in L{X2goRevFwTunnel.run()}. 189 190 """ 191 self.incoming_channel.acquire() 192 self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG) 193 self.incoming_channel.notify() 194 self.incoming_channel.release()
195
196 - def stop_thread(self):
197 """\ 198 Stops this L{X2goRevFwTunnel} thread completely. 199 200 """ 201 self.pause() 202 self._keepalive = False 203 self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG) 204 self.notify()
205
206 - def _request_port_forwarding(self):
207 try: 208 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) 209 except paramiko.SSHException: 210 # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on 211 # self.server_port 212 self.cancel_port_forward('', self.server_port) 213 gevent.sleep(1) 214 try: 215 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) 216 except paramiko.SSHException, e: 217 if self.session_instance: 218 self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port) 219 else: 220 self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
221
222 - def run(self):
223 """\ 224 This method gets run once an L{X2goRevFwTunnel} has been started with its 225 L{start()} method. Use L{X2goRevFwTunnel}.stop_thread() to stop the 226 reverse forwarding tunnel again. You can also temporarily lock the tunnel 227 down with L{X2goRevFwTunnel.pause()} and L{X2goRevFwTunnel.resume()}). 228 229 L{X2goRevFwTunnel.run()} waits for notifications of an appropriate incoming 230 Paramiko/SSH channel (issued by L{X2goRevFwTunnel.notify()}). Appropriate in 231 this context means, that its start point on the X2go server matches the class's 232 property C{server_port}. 233 234 Once a new incoming channel gets announced by the L{notify()} method, a new 235 L{X2goRevFwChannelThread} instance will be initialized. As a data stream handler, 236 the function L{x2go_rev_forward_channel_handler()} will be used. 237 238 The channel will last till the connection gets dropped on the X2go server side or 239 until the tunnel gets paused by an L{X2goRevFwTunnel.pause()} call or stopped via the 240 L{X2goRevFwTunnel.stop_thread()} method. 241 242 """ 243 self._request_port_forwarding() 244 self._keepalive = True 245 while self._keepalive: 246 247 self.incoming_channel.acquire() 248 249 self.logger('waiting for incoming data channel on X2go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 250 self.incoming_channel.wait() 251 252 if self._keepalive: 253 self.logger('detected incoming data channel on X2go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 254 _chan = self.ssh_transport.accept() 255 self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG) 256 else: 257 self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 258 259 self.incoming_channel.release() 260 if self._accept_channels and self._keepalive: 261 _new_chan_thread = X2goRevFwChannelThread(_chan, (self.remote_host, self.remote_port), 262 target=x2go_rev_forward_channel_handler, 263 kwargs={ 264 'chan': _chan, 265 'addr': self.remote_host, 266 'port': self.remote_port, 267 'parent_thread': self, 268 'logger': self.logger, 269 } 270 ) 271 _new_chan_thread.start() 272 self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
273 274
275 -def x2go_rev_forward_channel_handler(chan=None, addr='', port=0, parent_thread=None, logger=None, ):
276 """\ 277 Handle the data stream of a requested channel that got set up by a L{X2goRevFwTunnel} (Paramiko/SSH 278 reverse forwarding tunnel). 279 280 The channel (and the corresponding connections) close either ... 281 282 - ... if the connecting application closes the connection and thus, drops 283 the channel, or 284 - ... if the L{X2goRevFwTunnel} parent thread gets paused. The call 285 of L{X2goRevFwTunnel.pause()} on the instance can be used to shut down all incoming 286 tunneled SSH connections associated to this L{X2goRevFwTunnel} instance 287 from within a Python X2go application. 288 289 @param chan: channel 290 @type chan: C{class} 291 @param addr: bind address 292 @type addr: C{str} 293 @param port: bind port 294 @type port: C{int} 295 @param parent_thread: the calling L{X2goRevFwTunnel} instance 296 @type parent_thread: L{X2goRevFwTunnel} instance 297 @param logger: you can pass an L{X2goLogger} object to the 298 L{X2goRevFwTunnel} constructor 299 @type logger: L{X2goLogger} instance 300 301 """ 302 fw_socket = socket.socket() 303 if logger is None: 304 def _dummy_logger(msg, l): 305 pass
306 logger = _dummy_logger 307 308 try: 309 fw_socket.connect((addr, port)) 310 except Exception, e: 311 logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO) 312 return 313 314 logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr, 315 chan.getpeername(), (addr, port)), 316 loglevel=log.loglevel_INFO) 317 while parent_thread._accept_channels: 318 r, w, x = select.select([fw_socket, chan], [], []) 319 if fw_socket in r: 320 data = fw_socket.recv(1024) 321 if len(data) == 0: 322 break 323 chan.send(data) 324 if chan in r: 325 data = chan.recv(1024) 326 if len(data) == 0: 327 break 328 fw_socket.send(data) 329 330 chan.close() 331 fw_socket.close() 332 logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO) 333 334
335 -class X2goRevFwChannelThread(threading.Thread):
336 """\ 337 Starts a thread for each incoming Paramiko/SSH data channel trough the reverse 338 forwarding tunnel. 339 340 """
341 - def __init__(self, channel, remote=None, **kwargs):
342 """\ 343 Initializes a reverse forwarding channel thread. 344 345 @param channel: incoming Paramiko/SSH channel from the L{X2goSession}'s transport 346 accept queue 347 @type channel: class 348 @param remote: tuple (addr, port) that specifies the data endpoint of the channel 349 @type remote: C{tuple(str, int)} 350 351 """ 352 self.channel = channel 353 if remote is not None: 354 self.remote_host = remote[0] 355 self.remote_port = remote[1] 356 threading.Thread.__init__(self, **kwargs) 357 self.daemon = True
358