Package logilab :: Package common :: Module umessage
[frames] | no frames]

Source Code for Module logilab.common.umessage

  1  """Unicode email support (extends email from stdlib). 
  2   
  3  :copyright: 2000-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  4  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  5  :license: General Public License version 2 - http://www.gnu.org/licenses 
  6  """ 
  7  __docformat__ = "restructuredtext en" 
  8   
  9  import email 
 10  from encodings import search_function 
 11  from email.Utils import parseaddr, parsedate 
 12  from email.Header import decode_header 
 13  from datetime import datetime 
 14   
 15  try: 
 16      from mx.DateTime import DateTime 
 17  except ImportError: 
 18      DateTime = datetime 
 19   
 20  import logilab.common as lgc 
 21   
 22   
23 -def decode_QP(string):
24 parts = [] 25 for decoded, charset in decode_header(string): 26 if not charset : 27 charset = 'iso-8859-15' 28 parts.append(unicode(decoded, charset, 'replace')) 29 30 return u' '.join(parts)
31
32 -def message_from_file(fd):
33 try: 34 return UMessage(email.message_from_file(fd)) 35 except email.Errors.MessageParseError: 36 return ''
37
38 -def message_from_string(string):
39 try: 40 return UMessage(email.message_from_string(string)) 41 except email.Errors.MessageParseError: 42 return ''
43
44 -class UMessage:
45 """Encapsulates an email.Message instance and returns only unicode objects. 46 """ 47
48 - def __init__(self, message):
49 self.message = message
50 51 # email.Message interface ################################################# 52
53 - def get(self, header, default=None):
54 value = self.message.get(header, default) 55 if value: 56 return decode_QP(value) 57 return value
58
59 - def get_all(self, header, default=()):
60 return [decode_QP(val) for val in self.message.get_all(header, default) 61 if val is not None]
62
63 - def get_payload(self, index=None, decode=False):
64 message = self.message 65 if index is None: 66 payload = message.get_payload(index, decode) 67 if isinstance(payload, list): 68 return [UMessage(msg) for msg in payload] 69 if message.get_content_maintype() != 'text': 70 return payload 71 72 charset = message.get_content_charset() or 'iso-8859-1' 73 if search_function(charset) is None: 74 charset = 'iso-8859-1' 75 return unicode(payload or '', charset, "replace") 76 else: 77 payload = UMessage(message.get_payload(index, decode)) 78 return payload
79
80 - def is_multipart(self):
81 return self.message.is_multipart()
82
83 - def get_boundary(self):
84 return self.message.get_boundary()
85
86 - def walk(self):
87 for part in self.message.walk(): 88 yield UMessage(part)
89
90 - def get_content_maintype(self):
91 return unicode(self.message.get_content_maintype())
92
93 - def get_content_type(self):
94 return unicode(self.message.get_content_type())
95
96 - def get_filename(self, failobj=None):
97 value = self.message.get_filename(failobj) 98 if value is failobj: 99 return value 100 try: 101 return unicode(value) 102 except UnicodeDecodeError: 103 return u'error decoding filename'
104 105 # other convenience methods ############################################### 106
107 - def headers(self):
108 """return an unicode string containing all the message's headers""" 109 values = [] 110 for header in self.message.keys(): 111 values.append(u'%s: %s' % (header, self.get(header))) 112 return '\n'.join(values)
113
114 - def multi_addrs(self, header):
115 """return a list of 2-uple (name, address) for the given address (which 116 is expected to be an header containing address such as from, to, cc...) 117 """ 118 persons = [] 119 for person in self.get_all(header, ()): 120 name, mail = parseaddr(person) 121 persons.append((name, mail)) 122 return persons
123
124 - def date(self, alternative_source=False, return_str=False):
125 """return a datetime object for the email's date or None if no date is 126 set or if it can't be parsed 127 """ 128 value = self.get('date') 129 if value is None and alternative_source: 130 unix_from = self.message.get_unixfrom() 131 if unix_from is not None: 132 try: 133 value = unix_from.split(" ", 2)[2] 134 except IndexError: 135 pass 136 if value is not None: 137 datetuple = parsedate(value) 138 if datetuple: 139 if lgc.USE_MX_DATETIME: 140 return DateTime(*datetuple[:6]) 141 return datetime(*datetuple[:6]) 142 elif not return_str: 143 return None 144 return value
145