1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """\
21 X2go reverse SSH/Paramiko tunneling provides X2go sound, X2go printing and
22 X2go sshfs for folder sharing and mounting remote devices in X2go terminal
23 server sessions.
24
25 """
26 __NAME__ = 'x2gorevtunnel-pylib'
27
28
29 import copy
30 import threading
31 import gevent
32 import paramiko
33
34
35 from gevent import select, socket, Timeout
36 from gevent import sleep as gevent_sleep
37
38
39 import log
40
41
43 """\
44 An X2go customized TCP handler for the Paramiko/SSH C{Transport()} class.
45
46 Incoming channels will be put into Paramiko's default accept queue. This corresponds to
47 the default behaviour of Paramiko's C{Transport} class.
48
49 However, additionally this handler function checks the server port of the incoming channel
50 and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming
51 channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an L{X2goSession} instance
52 (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests).
53
54 If the server port of an incoming Paramiko/SSH channel matches the configured port of an L{X2goRevFwTunnel}
55 instance, this instance gets notified of the incoming channel and a new L{X2goRevFwChannelThread} is
56 started. This L{X2goRevFwChannelThread} then takes care of the new channel's incoming data stream.
57
58 """
59 transport = chan.get_transport()
60 transport._queue_incoming_channel(chan)
61 rev_tuns = transport.reverse_tunnels
62
63 for session_name in rev_tuns.keys():
64
65 if int(server_port) in [ int(tunnel[0]) for tunnel in rev_tuns[session_name].values() ]:
66
67 if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]):
68 rev_tuns[session_name]['snd'][1].notify()
69
70 elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]):
71 rev_tuns[session_name]['sshfs'][1].notify()
72
73
75 """\
76 L{X2goRevFwTunnel} class objects are used to reversely tunnel
77 X2go audio, X2go printing and X2go folder sharing / device mounting
78 through Paramiko/SSH.
79
80 """
81 - def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
82 """\
83 Setup a reverse tunnel through Paramiko/SSH.
84
85 After the reverse tunnel has been setup up with L{X2goRevFwTunnel.start()} it waits
86 for notification from L{X2goRevFwTunnel.notify()} to accept incoming channels. This
87 notification (L{X2goRevFwTunnel.notify()} gets called from within the transport's
88 TCP handler function L{x2go_transport_tcp_handler} of the L{X2goSession} instance.
89
90 @param server_port: the TCP/IP port on the X2go server (starting point of the tunnel),
91 normally some number above 30000
92 @type server_port: int
93 @param remote_host: the target address for reversely tunneled traffic. With X2go this should
94 always be set to the localhost (IPv4) address.
95 @type remote_host: str
96 @param remote_port: the TCP/IP port on the X2go client (end point of the tunnel),
97 normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.)
98 @type remote_port: int
99 @param ssh_transport: the L{X2goSession}'s Paramiko/SSH transport instance
100 @type ssh_transport: C{paramiko.Transport} instance
101 @param logger: you can pass an L{X2goLogger} object to the
102 L{X2goRevFwTunnel} constructor
103 @type logger: L{X2goLogger} instance
104 @param loglevel: if no L{X2goLogger} object has been supplied a new one will be
105 constructed with the given loglevel
106 @type loglevel: int
107
108 """
109 if logger is None:
110 self.logger = log.X2goLogger(loglevel=loglevel)
111 else:
112 self.logger = copy.deepcopy(logger)
113 self.logger.tag = __NAME__
114
115 self.server_port = server_port
116 self.remote_host = remote_host
117 self.remote_port = remote_port
118 self.ssh_transport = ssh_transport
119 self.session_instance = session_instance
120
121 self.open_channels = {}
122 self.incoming_channel = threading.Condition()
123
124 threading.Thread.__init__(self)
125 self.daemon = True
126 self._accept_channels = True
127
135
137 """\
138 Cancel a port forwarding request. This cancellation request is sent to the server and
139 on the server the port forwarding should be unregistered.
140
141 @param address: remote server address
142 @type address: C{str}
143 @param port: remote port
144 @type port: C{int}
145
146 """
147 timeout = Timeout(10)
148 timeout.start()
149 try:
150 self.ssh_transport.cancel_port_forward('', self.server_port)
151 except:
152 pass
153 finally:
154 timeout.cancel()
155
157 """\
158 Prevent acceptance of new incoming connections through the Paramiko/SSH
159 reverse forwarding tunnel. Also, any active connection on this L{X2goRevFwTunnel}
160 instance will be closed immediately, if this method is called.
161
162 """
163 if self._accept_channels == True:
164 self.cancel_port_forward('', self.server_port)
165 self._accept_channels = False
166 self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
167
169 """\
170 Resume operation of the Paramiko/SSH reverse forwarding tunnel
171 and continue accepting new incoming connections.
172
173 """
174 if self._accept_channels == False:
175 self._accept_channels = True
176 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
177 self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
178
180 """\
181 Notify an L{X2goRevFwTunnel} instance of an incoming Paramiko/SSH channel.
182
183 If an incoming reverse tunnel channel appropriate for this instance has
184 been detected, this method gets called from the L{X2goSession}'s transport
185 TCP handler.
186
187 The sent notification will trigger a C{thread.Condition()} waiting for notification
188 in L{X2goRevFwTunnel.run()}.
189
190 """
191 self.incoming_channel.acquire()
192 self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG)
193 self.incoming_channel.notify()
194 self.incoming_channel.release()
195
197 """\
198 Stops this L{X2goRevFwTunnel} thread completely.
199
200 """
201 self.pause()
202 self._keepalive = False
203 self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
204 self.notify()
205
207 try:
208 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
209 except paramiko.SSHException:
210
211
212 self.cancel_port_forward('', self.server_port)
213 gevent.sleep(1)
214 try:
215 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
216 except paramiko.SSHException, e:
217 if self.session_instance:
218 self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port)
219 else:
220 self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
221
223 """\
224 This method gets run once an L{X2goRevFwTunnel} has been started with its
225 L{start()} method. Use L{X2goRevFwTunnel}.stop_thread() to stop the
226 reverse forwarding tunnel again. You can also temporarily lock the tunnel
227 down with L{X2goRevFwTunnel.pause()} and L{X2goRevFwTunnel.resume()}).
228
229 L{X2goRevFwTunnel.run()} waits for notifications of an appropriate incoming
230 Paramiko/SSH channel (issued by L{X2goRevFwTunnel.notify()}). Appropriate in
231 this context means, that its start point on the X2go server matches the class's
232 property C{server_port}.
233
234 Once a new incoming channel gets announced by the L{notify()} method, a new
235 L{X2goRevFwChannelThread} instance will be initialized. As a data stream handler,
236 the function L{x2go_rev_forward_channel_handler()} will be used.
237
238 The channel will last till the connection gets dropped on the X2go server side or
239 until the tunnel gets paused by an L{X2goRevFwTunnel.pause()} call or stopped via the
240 L{X2goRevFwTunnel.stop_thread()} method.
241
242 """
243 self._request_port_forwarding()
244 self._keepalive = True
245 while self._keepalive:
246
247 self.incoming_channel.acquire()
248
249 self.logger('waiting for incoming data channel on X2go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
250 self.incoming_channel.wait()
251
252 if self._keepalive:
253 self.logger('detected incoming data channel on X2go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
254 _chan = self.ssh_transport.accept()
255 self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG)
256 else:
257 self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
258
259 self.incoming_channel.release()
260 if self._accept_channels and self._keepalive:
261 _new_chan_thread = X2goRevFwChannelThread(_chan, (self.remote_host, self.remote_port),
262 target=x2go_rev_forward_channel_handler,
263 kwargs={
264 'chan': _chan,
265 'addr': self.remote_host,
266 'port': self.remote_port,
267 'parent_thread': self,
268 'logger': self.logger,
269 }
270 )
271 _new_chan_thread.start()
272 self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
273
274
276 """\
277 Handle the data stream of a requested channel that got set up by a L{X2goRevFwTunnel} (Paramiko/SSH
278 reverse forwarding tunnel).
279
280 The channel (and the corresponding connections) close either ...
281
282 - ... if the connecting application closes the connection and thus, drops
283 the channel, or
284 - ... if the L{X2goRevFwTunnel} parent thread gets paused. The call
285 of L{X2goRevFwTunnel.pause()} on the instance can be used to shut down all incoming
286 tunneled SSH connections associated to this L{X2goRevFwTunnel} instance
287 from within a Python X2go application.
288
289 @param chan: channel
290 @type chan: C{class}
291 @param addr: bind address
292 @type addr: C{str}
293 @param port: bind port
294 @type port: C{int}
295 @param parent_thread: the calling L{X2goRevFwTunnel} instance
296 @type parent_thread: L{X2goRevFwTunnel} instance
297 @param logger: you can pass an L{X2goLogger} object to the
298 L{X2goRevFwTunnel} constructor
299 @type logger: L{X2goLogger} instance
300
301 """
302 fw_socket = socket.socket()
303 if logger is None:
304 def _dummy_logger(msg, l):
305 pass
306 logger = _dummy_logger
307
308 try:
309 fw_socket.connect((addr, port))
310 except Exception, e:
311 logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO)
312 return
313
314 logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr,
315 chan.getpeername(), (addr, port)),
316 loglevel=log.loglevel_INFO)
317 while parent_thread._accept_channels:
318 r, w, x = select.select([fw_socket, chan], [], [])
319 if fw_socket in r:
320 data = fw_socket.recv(1024)
321 if len(data) == 0:
322 break
323 chan.send(data)
324 if chan in r:
325 data = chan.recv(1024)
326 if len(data) == 0:
327 break
328 fw_socket.send(data)
329
330 chan.close()
331 fw_socket.close()
332 logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO)
333
334
336 """\
337 Starts a thread for each incoming Paramiko/SSH data channel trough the reverse
338 forwarding tunnel.
339
340 """
341 - def __init__(self, channel, remote=None, **kwargs):
342 """\
343 Initializes a reverse forwarding channel thread.
344
345 @param channel: incoming Paramiko/SSH channel from the L{X2goSession}'s transport
346 accept queue
347 @type channel: class
348 @param remote: tuple (addr, port) that specifies the data endpoint of the channel
349 @type remote: C{tuple(str, int)}
350
351 """
352 self.channel = channel
353 if remote is not None:
354 self.remote_host = remote[0]
355 self.remote_port = remote[1]
356 threading.Thread.__init__(self, **kwargs)
357 self.daemon = True
358