Package ldaptor :: Package protocols :: Module pureber
[hide private]
[frames] | no frames]

Source Code for Module ldaptor.protocols.pureber

  1  # Twisted, the Framework of Your Internet 
  2  # Copyright (C) 2001 Matthew W. Lefkowitz 
  3  # 
  4  # This library is free software; you can redistribute it and/or 
  5  # modify it under the terms of version 2.1 of the GNU Lesser General Public 
  6  # License as published by the Free Software Foundation. 
  7  # 
  8  # This library is distributed in the hope that it will be useful, 
  9  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 11  # Lesser General Public License for more details. 
 12  # 
 13  # You should have received a copy of the GNU Lesser General Public 
 14  # License along with this library; if not, write to the Free Software 
 15  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 16   
 17  """Pure, simple, BER encoding and decoding""" 
 18   
 19  # This BER library is currently aimed at supporting LDAP, thus 
 20  # the following restrictions from RFC2251 apply: 
 21  # 
 22  # (1) Only the definite form of length encoding will be used. 
 23  # 
 24  # (2) OCTET STRING values will be encoded in the primitive form 
 25  #     only. 
 26  # 
 27  # (3) If the value of a BOOLEAN type is true, the encoding MUST have 
 28  #     its contents octets set to hex "FF". 
 29  # 
 30  # (4) If a value of a type is its default value, it MUST be absent. 
 31  #     Only some BOOLEAN and INTEGER types have default values in 
 32  #     this protocol definition. 
 33   
 34   
 35  import string 
 36   
 37  # xxxxxxxx 
 38  # |/|\.../ 
 39  # | | | 
 40  # | | tag 
 41  # | | 
 42  # | primitive (0) or structured (1) 
 43  # | 
 44  # class 
 45   
 46  CLASS_MASK              = 0xc0 
 47  CLASS_UNIVERSAL = 0x00 
 48  CLASS_APPLICATION       = 0x40 
 49  CLASS_CONTEXT           = 0x80 
 50  CLASS_PRIVATE           = 0xc0 
 51   
 52  STRUCTURED_MASK         = 0x20 
 53  STRUCTURED              = 0x20 
 54  NOT_STRUCTURED          = 0x00 
 55   
 56  TAG_MASK                = 0x1f 
 57   
 58  # LENGTH 
 59  # 0xxxxxxx = 0..127 
 60  # 1xxxxxxx = len is stored in the next 0xxxxxxx octets 
 61  # indefinite form not supported 
 62   
63 -class UnknownBERTag(Exception):
64 - def __init__(self, tag, context):
65 Exception.__init__(self) 66 self.tag = tag 67 self.context = context
68
69 - def __str__(self):
70 return "BERDecoderContext has no tag 0x%02x: %s" \ 71 % (self.tag, self.context)
72 73 import UserList 74
75 -def berDecodeLength(m, offset=0):
76 """ 77 Return a tuple of (length, lengthLength). 78 m must be atleast one byte long. 79 """ 80 l=ber2int(m[offset+0]) 81 ll=1 82 if l&0x80: 83 ll=1+(l&0x7F) 84 need(m, offset+ll) 85 l=ber2int(m[offset+1:offset+ll], signed=0) 86 return (l, ll)
87
88 -def int2berlen(i):
89 assert i>=0 90 e=int2ber(i, signed=False) 91 if i <= 127: 92 return e 93 else: 94 l=len(e) 95 assert l>0 96 assert l<=127 97 return chr(0x80|l) + e
98
99 -def int2ber(i, signed=True):
100 encoded='' 101 while ((signed and (i>127 or i<-128)) 102 or (not signed and (i>255))): 103 encoded=chr(i%256)+encoded 104 i=i>>8 105 encoded=chr(i%256)+encoded 106 return encoded
107
108 -def ber2int(e, signed=True):
109 need(e, 1) 110 v=0L+ord(e[0]) 111 if v&0x80 and signed: 112 v=v-256 113 for i in range(1, len(e)): 114 v=(v<<8) | ord(e[i]) 115 return v
116
117 -class BERBase:
118 tag = None 119
120 - def identification(self):
121 return self.tag
122
123 - def __init__(self, tag=None):
124 if tag is not None: 125 self.tag=tag
126
127 - def __len__(self):
128 return len(str(self))
129
130 - def __cmp__(self, other):
131 if isinstance(other, BERBase): 132 return cmp(str(self), str(other)) 133 else: 134 return -1
135
136 - def __eq__(self, other):
137 if isinstance(other, BERBase): 138 return str(self) == str(other) 139 else: 140 return False
141
142 - def __ne__(self, other):
143 if isinstance(other, BERBase): 144 return str(self) != str(other) 145 else: 146 return False
147
148 -class BERStructured(BERBase):
149 - def identification(self):
150 return STRUCTURED|self.tag
151
152 -class BERException(Exception): pass
153
154 -class BERExceptionInsufficientData(Exception): pass
155
156 -def need(buf, n):
157 d=n-len(buf) 158 if d>0: 159 raise BERExceptionInsufficientData, d
160
161 -class BERInteger(BERBase):
162 tag = 0x02 163 value = None 164
165 - def fromBER(klass, tag, content, berdecoder=None):
166 assert len(content)>0 167 value=ber2int(content) 168 r = klass(value=value, tag=tag) 169 return r
170 fromBER = classmethod(fromBER) 171
172 - def __init__(self, value=None, tag=None):
173 """Create a new BERInteger object. 174 value is an integer. 175 """ 176 BERBase.__init__(self, tag) 177 assert value is not None 178 self.value=value
179
180 - def __str__(self):
181 encoded=int2ber(self.value) 182 return chr(self.identification()) \ 183 +int2berlen(len(encoded)) \ 184 +encoded
185
186 - def __repr__(self):
187 if self.tag==self.__class__.tag: 188 return self.__class__.__name__+"(value=%r)"%self.value 189 else: 190 return self.__class__.__name__+"(value=%r, tag=%d)" \ 191 %(self.value, self.tag)
192
193 -class BEROctetString(BERBase):
194 tag = 0x04 195 196 value = None 197
198 - def fromBER(klass, tag, content, berdecoder=None):
199 assert len(content)>=0 200 r = klass(value=content, tag=tag) 201 return r
202 fromBER = classmethod(fromBER) 203
204 - def __init__(self, value=None, tag=None):
205 BERBase.__init__(self, tag) 206 assert value is not None 207 self.value=value
208
209 - def __str__(self):
210 value = str(self.value) 211 return chr(self.identification()) \ 212 +int2berlen(len(value)) \ 213 +value
214
215 - def __repr__(self):
216 if self.tag==self.__class__.tag: 217 return self.__class__.__name__+"(value=%s)" \ 218 %repr(self.value) 219 else: 220 return self.__class__.__name__ \ 221 +"(value=%s, tag=%d)" \ 222 %(repr(self.value), self.tag)
223
224 -class BERNull(BERBase):
225 tag = 0x05 226
227 - def fromBER(klass, tag, content, berdecoder=None):
228 assert len(content) == 0 229 r = klass(tag=tag) 230 return r
231 fromBER = classmethod(fromBER) 232
233 - def __init__(self, tag=None):
234 BERBase.__init__(self, tag)
235
236 - def __str__(self):
237 return chr(self.identification())+chr(0)
238
239 - def __repr__(self):
240 if self.tag==self.__class__.tag: 241 return self.__class__.__name__+"()" 242 else: 243 return self.__class__.__name__+"(tag=%d)"%self.tag
244
245 -class BERBoolean(BERBase):
246 tag = 0x01 247
248 - def fromBER(klass, tag, content, berdecoder=None):
249 assert len(content) > 0 250 value = ber2int(content) 251 r = klass(value=value, tag=tag) 252 return r
253 fromBER = classmethod(fromBER) 254
255 - def __init__(self, value=None, tag=None):
256 """Create a new BERInteger object. 257 value is an integer. 258 """ 259 BERBase.__init__(self, tag) 260 assert value is not None 261 if value: 262 value=0xFF 263 self.value=value
264
265 - def __str__(self):
266 assert self.value==0 or self.value==0xFF 267 return chr(self.identification()) \ 268 +int2berlen(1) \ 269 +chr(self.value)
270
271 - def __repr__(self):
272 if self.tag==self.__class__.tag: 273 return self.__class__.__name__+"(value=%d)"%self.value 274 else: 275 return self.__class__.__name__+"(value=%d, tag=%d)" \ 276 %(self.value, self.tag)
277 278
279 -class BEREnumerated(BERInteger):
280 tag = 0x0a
281
282 -class BERSequence(BERStructured, UserList.UserList):
283 # TODO __getslice__ calls __init__ with no args. 284 tag = 0x10 285
286 - def fromBER(klass, tag, content, berdecoder=None):
287 l = berDecodeMultiple(content, berdecoder) 288 r = klass(l, tag=tag) 289 return r
290 fromBER = classmethod(fromBER) 291
292 - def __init__(self, value=None, tag=None):
293 BERStructured.__init__(self, tag) 294 UserList.UserList.__init__(self) 295 assert value is not None 296 self[:]=value
297
298 - def __str__(self):
299 r=string.join(map(str, self.data), '') 300 return chr(self.identification())+int2berlen(len(r))+r
301
302 - def __repr__(self):
303 if self.tag==self.__class__.tag: 304 return self.__class__.__name__+"(value=%s)"%repr(self.data) 305 else: 306 return self.__class__.__name__+"(value=%s, tag=%d)" \ 307 %(repr(self.data), self.tag)
308 309
310 -class BERSequenceOf(BERSequence):
311 pass
312
313 -class BERSet(BERSequence):
314 tag = 0x11 315 pass
316 317 318
319 -class BERDecoderContext:
320 Identities = { 321 BERInteger.tag: BERInteger, 322 BEROctetString.tag: BEROctetString, 323 BERNull.tag: BERNull, 324 BERBoolean.tag: BERBoolean, 325 BEREnumerated.tag: BEREnumerated, 326 BERSequence.tag: BERSequence, 327 BERSet.tag: BERSet, 328 } 329
330 - def __init__(self, fallback=None, inherit=None):
331 self.fallback=fallback 332 self.inherit_context=inherit
333
334 - def lookup_id(self, id):
335 try: 336 return self.Identities[id] 337 except KeyError: 338 if self.fallback: 339 return self.fallback.lookup_id(id) 340 else: 341 return None
342
343 - def inherit(self):
344 return self.inherit_context or self
345
346 - def __repr__(self):
347 identities = [] 348 for tag, class_ in self.Identities.items(): 349 identities.append('0x%02x: %s' % (tag, class_.__name__)) 350 return "<"+self.__class__.__name__ \ 351 +" identities={%s}" % ', '.join(identities) \ 352 +" fallback="+repr(self.fallback) \ 353 +" inherit="+repr(self.inherit_context) \ 354 +">"
355
356 -def berDecodeObject(context, m):
357 """berDecodeObject(context, string) -> (berobject, bytesUsed) 358 berobject may be None. 359 """ 360 while m: 361 need(m, 2) 362 i=ber2int(m[0], signed=0)&(CLASS_MASK|TAG_MASK) 363 364 length, lenlen = berDecodeLength(m, offset=1) 365 need(m, 1+lenlen+length) 366 m2 = m[1+lenlen:1+lenlen+length] 367 368 berclass=context.lookup_id(i) 369 if berclass: 370 inh=context.inherit() 371 assert inh 372 r = berclass.fromBER(tag=i, 373 content=m2, 374 berdecoder=inh) 375 return (r, 1+lenlen+length) 376 else: 377 #raise UnknownBERTag, (i, context) 378 print str(UnknownBERTag(i, context)) #TODO 379 return (None, 1+lenlen+length) 380 return (None, 0)
381
382 -def berDecodeMultiple(content, berdecoder):
383 """berDecodeMultiple(content, berdecoder) -> [objects] 384 385 Decodes everything in content and returns a list of decoded 386 objects. 387 388 All of content will be decoded, and content must contain complete 389 BER objects. 390 """ 391 l = [] 392 while content: 393 n, bytes = berDecodeObject(berdecoder, content) 394 if n is not None: 395 l.append(n) 396 assert bytes <= len(content) 397 content = content[bytes:] 398 return l
399 400 #TODO unimplemented classes are below: 401 402 #class BERObjectIdentifier(BERBase): 403 # tag = 0x06 404 # pass 405 406 #class BERIA5String(BERBase): 407 # tag = 0x16 408 # pass 409 410 #class BERPrintableString(BERBase): 411 # tag = 0x13 412 # pass 413 414 #class BERT61String(BERBase): 415 # tag = 0x14 416 # pass 417 418 #class BERUTCTime(BERBase): 419 # tag = 0x17 420 # pass 421 422 #class BERBitString(BERBase): 423 # tag = 0x03 424 # pass 425