1 """Test the various means of instantiating and invoking tools."""
2
3 import gzip
4 import StringIO
5 import sys
6 from httplib import IncompleteRead
7 import time
8 timeout = 0.2
9
10 import types
11 from cherrypy.test import test
12 test.prefer_parent_path()
13
14 import cherrypy
15 from cherrypy import tools
16
17
18 europoundUnicode = u'\x80\xa3'
19
28 myauthtools.check_access = cherrypy.Tool('before_request_body', check_access)
29
30 def numerify():
31 def number_it(body):
32 for chunk in body:
33 for k, v in cherrypy.request.numerify_map:
34 chunk = chunk.replace(k, v)
35 yield chunk
36 cherrypy.response.body = number_it(cherrypy.response.body)
37
38 class NumTool(cherrypy.Tool):
39 def _setup(self):
40 def makemap():
41 m = self._merged_args().get("map", {})
42 cherrypy.request.numerify_map = m.items()
43 cherrypy.request.hooks.attach('on_start_resource', makemap)
44
45 def critical():
46 cherrypy.request.error_response = cherrypy.HTTPError(502).set_response
47 critical.failsafe = True
48
49 cherrypy.request.hooks.attach('on_start_resource', critical)
50 cherrypy.request.hooks.attach(self._point, self.callable)
51
52 tools.numerify = NumTool('before_finalize', numerify)
53
54
55 class NadsatTool:
56
57 def __init__(self):
58 self.ended = {}
59 self._name = "nadsat"
60
61 def nadsat(self):
62 def nadsat_it_up(body):
63 for chunk in body:
64 chunk = chunk.replace("good", "horrorshow")
65 chunk = chunk.replace("piece", "lomtick")
66 yield chunk
67 cherrypy.response.body = nadsat_it_up(cherrypy.response.body)
68 nadsat.priority = 0
69
70 def cleanup(self):
71
72 cherrypy.response.body = "razdrez"
73 id = cherrypy.request.params.get("id")
74 if id:
75 self.ended[id] = True
76 cleanup.failsafe = True
77
78 def _setup(self):
79 cherrypy.request.hooks.attach('before_finalize', self.nadsat)
80 cherrypy.request.hooks.attach('on_end_request', self.cleanup)
81 tools.nadsat = NadsatTool()
82
83 def pipe_body():
84 cherrypy.request.process_request_body = False
85 clen = int(cherrypy.request.headers['Content-Length'])
86 cherrypy.request.body = cherrypy.request.rfile.read(clen)
87
88
89 class Rotator(object):
90 def __call__(self, scale):
91 r = cherrypy.response
92 r.collapse_body()
93 r.body = [chr(ord(x) + scale) for x in r.body]
94 cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator())
95
96 def stream_handler(next_handler, *args, **kwargs):
97 cherrypy.response.output = o = StringIO.StringIO()
98 try:
99 response = next_handler(*args, **kwargs)
100
101 return o.getvalue()
102 finally:
103 o.close()
104 cherrypy.tools.streamer = cherrypy._cptools.HandlerWrapperTool(stream_handler)
105
106 class Root:
107 def index(self):
108 return "Howdy earth!"
109 index.exposed = True
110
111 def tarfile(self):
112 cherrypy.response.output.write('I am ')
113 cherrypy.response.output.write('a tarfile')
114 tarfile.exposed = True
115 tarfile._cp_config = {'tools.streamer.on': True}
116
117 def euro(self):
118 hooks = list(cherrypy.request.hooks['before_finalize'])
119 hooks.sort()
120 assert [x.callback.__name__ for x in hooks] == ['encode', 'gzip']
121 assert [x.priority for x in hooks] == [70, 80]
122 yield u"Hello,"
123 yield u"world"
124 yield europoundUnicode
125 euro.exposed = True
126
127
128 def pipe(self):
129 return cherrypy.request.body
130 pipe.exposed = True
131 pipe._cp_config = {'hooks.before_request_body': pipe_body}
132
133
134
135 def decorated_euro(self, *vpath):
136 yield u"Hello,"
137 yield u"world"
138 yield europoundUnicode
139 decorated_euro.exposed = True
140 decorated_euro = tools.gzip(compress_level=6)(decorated_euro)
141 decorated_euro = tools.encode(errors='ignore')(decorated_euro)
142
143 root = Root()
144
145
146 class TestType(type):
147 """Metaclass which automatically exposes all functions in each subclass,
148 and adds an instance of the subclass as an attribute of root.
149 """
150 def __init__(cls, name, bases, dct):
151 type.__init__(cls, name, bases, dct)
152 for value in dct.itervalues():
153 if isinstance(value, types.FunctionType):
154 value.exposed = True
155 setattr(root, name.lower(), cls())
156 class Test(object):
157 __metaclass__ = TestType
158
159
160
161
162 class Demo(Test):
163
164 _cp_config = {"tools.nadsat.on": True}
165
166 def index(self, id=None):
167 return "A good piece of cherry pie"
168
169 def ended(self, id):
170 return repr(tools.nadsat.ended[id])
171
172 def err(self, id=None):
173 raise ValueError()
174
175 def errinstream(self, id=None):
176 yield "nonconfidential"
177 raise ValueError()
178 yield "confidential"
179
180
181
182
183 def restricted(self):
184 return "Welcome!"
185 restricted = myauthtools.check_access()(restricted)
186 userid = restricted
187
188 def err_in_onstart(self):
189 return "success!"
190
191 def stream(self, id=None):
192 for x in xrange(100000000):
193 yield str(x)
194 stream._cp_config = {'response.stream': True}
195
196
197 cherrypy.config.update({'environment': 'test_suite'})
198
199 conf = {
200
201
202 '/demo': {
203 'tools.numerify.on': True,
204 'tools.numerify.map': {"pie": "3.14159"},
205 },
206 '/demo/restricted': {
207 'request.show_tracebacks': False,
208 },
209 '/demo/userid': {
210 'request.show_tracebacks': False,
211 'myauth.check_access.default': True,
212 },
213 '/demo/errinstream': {
214 'response.stream': True,
215 },
216 '/demo/err_in_onstart': {
217
218 'tools.numerify.map': "pie->3.14159"
219 },
220
221 '/euro': {
222 'tools.gzip.on': True,
223 'tools.encode.on': True,
224 },
225
226 '/decorated_euro/subpath': {
227 'tools.gzip.priority': 10,
228 },
229
230 '/tarfile': {'tools.streamer.on': True}
231 }
232 app = cherrypy.tree.mount(root, config=conf)
233 app.request_class.namespaces['myauth'] = myauthtools
234
235 if sys.version_info >= (2, 5):
236 from cherrypy.test import py25
237 root.tooldecs = py25.ToolExamples()
238
239
240
241
242 from cherrypy.test import helper
243
244
381
382
383 if __name__ == '__main__':
384 setup_server()
385 helper.testmain()
386