Package common :: Module umessage
[frames] | no frames]

Source Code for Module common.umessage

  1  # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """Unicode email support (extends email from stdlib)""" 
 19   
 20  __docformat__ = "restructuredtext en" 
 21   
 22  import email 
 23  from encodings import search_function 
 24  import sys 
 25  if sys.version_info >= (2, 5): 
 26      from email.utils import parseaddr, parsedate 
 27      from email.header import decode_header 
 28  else: 
 29      from email.Utils import parseaddr, parsedate 
 30      from email.Header import decode_header 
 31   
 32  from datetime import datetime 
 33   
 34  try: 
 35      from mx.DateTime import DateTime 
 36  except ImportError: 
 37      DateTime = datetime 
 38   
 39  import logilab.common as lgc 
 40   
 41   
42 -def decode_QP(string):
43 parts = [] 44 for decoded, charset in decode_header(string): 45 if not charset : 46 charset = 'iso-8859-15' 47 parts.append(str(decoded, charset, 'replace')) 48 49 return ' '.join(parts)
50
51 -def message_from_file(fd):
52 try: 53 return UMessage(email.message_from_file(fd)) 54 except email.Errors.MessageParseError: 55 return ''
56
57 -def message_from_string(string):
58 try: 59 return UMessage(email.message_from_string(string)) 60 except email.Errors.MessageParseError: 61 return ''
62
63 -class UMessage:
64 """Encapsulates an email.Message instance and returns only unicode objects. 65 """ 66
67 - def __init__(self, message):
68 self.message = message
69 70 # email.Message interface ################################################# 71
72 - def get(self, header, default=None):
73 value = self.message.get(header, default) 74 if value: 75 return decode_QP(value) 76 return value
77
78 - def __getitem__(self, header):
79 return self.get(header)
80
81 - def get_all(self, header, default=()):
82 return [decode_QP(val) for val in self.message.get_all(header, default) 83 if val is not None]
84
85 - def is_multipart(self):
86 return self.message.is_multipart()
87
88 - def get_boundary(self):
89 return self.message.get_boundary()
90
91 - def walk(self):
92 for part in self.message.walk(): 93 yield UMessage(part)
94 95 if sys.version_info < (3, 0): 96
97 - def get_payload(self, index=None, decode=False):
98 message = self.message 99 if index is None: 100 payload = message.get_payload(index, decode) 101 if isinstance(payload, list): 102 return [UMessage(msg) for msg in payload] 103 if message.get_content_maintype() != 'text': 104 return payload 105 106 charset = message.get_content_charset() or 'iso-8859-1' 107 if search_function(charset) is None: 108 charset = 'iso-8859-1' 109 return str(payload or '', charset, "replace") 110 else: 111 payload = UMessage(message.get_payload(index, decode)) 112 return payload
113
114 - def get_content_maintype(self):
115 return str(self.message.get_content_maintype())
116
117 - def get_content_type(self):
118 return str(self.message.get_content_type())
119
120 - def get_filename(self, failobj=None):
121 value = self.message.get_filename(failobj) 122 if value is failobj: 123 return value 124 try: 125 return str(value) 126 except UnicodeDecodeError: 127 return 'error decoding filename'
128 129 else: 130
131 - def get_payload(self, index=None, decode=False):
132 message = self.message 133 if index is None: 134 payload = message.get_payload(index, decode) 135 if isinstance(payload, list): 136 return [UMessage(msg) for msg in payload] 137 return payload 138 else: 139 payload = UMessage(message.get_payload(index, decode)) 140 return payload
141
142 - def get_content_maintype(self):
143 return self.message.get_content_maintype()
144
145 - def get_content_type(self):
146 return self.message.get_content_type()
147
148 - def get_filename(self, failobj=None):
149 return self.message.get_filename(failobj)
150 151 # other convenience methods ############################################### 152
153 - def headers(self):
154 """return an unicode string containing all the message's headers""" 155 values = [] 156 for header in list(self.message.keys()): 157 values.append('%s: %s' % (header, self.get(header))) 158 return '\n'.join(values)
159
160 - def multi_addrs(self, header):
161 """return a list of 2-uple (name, address) for the given address (which 162 is expected to be an header containing address such as from, to, cc...) 163 """ 164 persons = [] 165 for person in self.get_all(header, ()): 166 name, mail = parseaddr(person) 167 persons.append((name, mail)) 168 return persons
169
170 - def date(self, alternative_source=False, return_str=False):
171 """return a datetime object for the email's date or None if no date is 172 set or if it can't be parsed 173 """ 174 value = self.get('date') 175 if value is None and alternative_source: 176 unix_from = self.message.get_unixfrom() 177 if unix_from is not None: 178 try: 179 value = unix_from.split(" ", 2)[2] 180 except IndexError: 181 pass 182 if value is not None: 183 datetuple = parsedate(value) 184 if datetuple: 185 if lgc.USE_MX_DATETIME: 186 return DateTime(*datetuple[:6]) 187 return datetime(*datetuple[:6]) 188 elif not return_str: 189 return None 190 return value
191