1 """Help to generate SQL strings usable by the Python DB-API.
2
3 :author: Logilab
4 :copyright: 2000-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
6 :license: General Public License version 2 - http://www.gnu.org/licenses
7 """
8 __docformat__ = "restructuredtext en"
9
10
11
13 """
14 Helper class to generate SQL strings to use with python's DB-API.
15 """
16
17 - def where(self, keys, addon=None) :
18 """
19 :param keys: list of keys
20
21 >>> s = SQLGenerator()
22 >>> s.where(['nom'])
23 'nom = %(nom)s'
24 >>> s.where(['nom','prenom'])
25 'nom = %(nom)s AND prenom = %(prenom)s'
26 >>> s.where(['nom','prenom'], 'x.id = y.id')
27 'x.id = y.id AND nom = %(nom)s AND prenom = %(prenom)s'
28 """
29 restriction = ["%s = %%(%s)s" % (x, x) for x in keys]
30 if addon:
31 restriction.insert(0, addon)
32 return " AND ".join(restriction)
33
34 - def set(self, keys) :
35 """
36 :param keys: list of keys
37
38 >>> s = SQLGenerator()
39 >>> s.set(['nom'])
40 'nom = %(nom)s'
41 >>> s.set(['nom','prenom'])
42 'nom = %(nom)s, prenom = %(prenom)s'
43 """
44 return ", ".join(["%s = %%(%s)s" % (x, x) for x in keys])
45
46 - def insert(self, table, params) :
47 """
48 :param table: name of the table
49 :param params: dictionary that will be used as in cursor.execute(sql,params)
50
51 >>> s = SQLGenerator()
52 >>> s.insert('test',{'nom':'dupont'})
53 'INSERT INTO test ( nom ) VALUES ( %(nom)s )'
54 >>> s.insert('test',{'nom':'dupont','prenom':'jean'})
55 'INSERT INTO test ( nom, prenom ) VALUES ( %(nom)s, %(prenom)s )'
56 """
57 keys = ', '.join(params.keys())
58 values = ', '.join(["%%(%s)s" % x for x in params])
59 sql = 'INSERT INTO %s ( %s ) VALUES ( %s )' % (table, keys, values)
60 return sql
61
62 - def select(self, table, params) :
63 """
64 :param table: name of the table
65 :param params: dictionary that will be used as in cursor.execute(sql,params)
66
67 >>> s = SQLGenerator()
68 >>> s.select('test',{})
69 'SELECT * FROM test'
70 >>> s.select('test',{'nom':'dupont'})
71 'SELECT * FROM test WHERE nom = %(nom)s'
72 >>> s.select('test',{'nom':'dupont','prenom':'jean'})
73 'SELECT * FROM test WHERE nom = %(nom)s AND prenom = %(prenom)s'
74 """
75 sql = 'SELECT * FROM %s' % table
76 where = self.where(params.keys())
77 if where :
78 sql = sql + ' WHERE %s' % where
79 return sql
80
81 - def adv_select(self, model, tables, params, joins=None) :
82 """
83 :param model: list of columns to select
84 :param tables: list of tables used in from
85 :param params: dictionary that will be used as in cursor.execute(sql, params)
86 :param joins: optional list of restriction statements to insert in the
87 where clause. Usually used to perform joins.
88
89 >>> s = SQLGenerator()
90 >>> s.adv_select(['column'],[('test', 't')], {})
91 'SELECT column FROM test AS t'
92 >>> s.adv_select(['column'],[('test', 't')], {'nom':'dupont'})
93 'SELECT column FROM test AS t WHERE nom = %(nom)s'
94 """
95 table_names = ["%s AS %s" % (k, v) for k, v in tables]
96 sql = 'SELECT %s FROM %s' % (', '.join(model), ', '.join(table_names))
97 if joins and type(joins) != type(''):
98 joins = ' AND '.join(joins)
99 where = self.where(params.keys(), joins)
100 if where :
101 sql = sql + ' WHERE %s' % where
102 return sql
103
104 - def delete(self, table, params) :
105 """
106 :param table: name of the table
107 :param params: dictionary that will be used as in cursor.execute(sql,params)
108
109 >>> s = SQLGenerator()
110 >>> s.delete('test',{'nom':'dupont'})
111 'DELETE FROM test WHERE nom = %(nom)s'
112 >>> s.delete('test',{'nom':'dupont','prenom':'jean'})
113 'DELETE FROM test WHERE nom = %(nom)s AND prenom = %(prenom)s'
114 """
115 where = self.where(params.keys())
116 sql = 'DELETE FROM %s WHERE %s' % (table, where)
117 return sql
118
119 - def update(self, table, params, unique) :
120 """
121 :param table: name of the table
122 :param params: dictionary that will be used as in cursor.execute(sql,params)
123
124 >>> s = SQLGenerator()
125 >>> s.update('test', {'id':'001','nom':'dupont'}, ['id'])
126 'UPDATE test SET nom = %(nom)s WHERE id = %(id)s'
127 >>> s.update('test',{'id':'001','nom':'dupont','prenom':'jean'},['id'])
128 'UPDATE test SET nom = %(nom)s, prenom = %(prenom)s WHERE id = %(id)s'
129 """
130 where = self.where(unique)
131 set = self.set([key for key in params if key not in unique])
132 sql = 'UPDATE %s SET %s WHERE %s' % (table, set, where)
133 return sql
134
136 """
137 Another helper class to ease SQL table manipulation.
138 """
139
140
141
142
143
144 - def __init__(self, table_name, table_fields, primary_key=None):
145 if primary_key is None:
146 self._primary_key = table_fields[0][0]
147 else:
148 self._primary_key = primary_key
149
150 self._table_fields = table_fields
151 self._table_name = table_name
152 info = {
153 'key' : self._primary_key,
154 'table' : self._table_name,
155 'columns' : ",".join( [ f for f,t in self._table_fields ] ),
156 'values' : ",".join( [sql_repr(t, "%%(%s)s" % f)
157 for f,t in self._table_fields] ),
158 'updates' : ",".join( ["%s=%s" % (f, sql_repr(t, "%%(%s)s" % f))
159 for f,t in self._table_fields] ),
160 }
161 self._insert_stmt = ("INSERT into %(table)s (%(columns)s) "
162 "VALUES (%(values)s) WHERE %(key)s=%%(key)s") % info
163 self._update_stmt = ("UPDATE %(table)s SET (%(updates)s) "
164 "VALUES WHERE %(key)s=%%(key)s") % info
165 self._select_stmt = ("SELECT %(columns)s FROM %(table)s "
166 "WHERE %(key)s=%%(key)s") % info
167 self._delete_stmt = ("DELETE FROM %(table)s "
168 "WHERE %(key)s=%%(key)s") % info
169
170 for k, t in table_fields:
171 if hasattr(self, k):
172 raise ValueError("Cannot use %s as a table field" % k)
173 setattr(self, k,None)
174
175
177 d = {}
178 for k, t in self._table_fields:
179 d[k] = getattr(self, k)
180 return d
181
183 d = { 'key' : getattr(self,self._primary_key) }
184 cursor.execute(self._select_stmt % d)
185 rows = cursor.fetchall()
186 if len(rows)!=1:
187 msg = "Select: ambiguous query returned %d rows"
188 raise ValueError(msg % len(rows))
189 for (f, t), v in zip(self._table_fields, rows[0]):
190 setattr(self, f, v)
191
195
197 d = { 'key' : getattr(self,self._primary_key) }
198
199
200
201
203 """
204 Take a cursor and a list of records fetched with that cursor, then return a
205 list of dictionaries (one for each record) whose keys are column names and
206 values are records' values.
207
208 :param cursor: cursor used to execute the query
209 :param records: list returned by fetch*()
210 """
211 result = []
212 for record in records :
213 record_dict = {}
214 for i in range(len(record)) :
215 record_dict[cursor.description[i][0]] = record[i]
216 result.append(record_dict)
217 return result
218
220 if type == 's':
221 return "'%s'" % (val,)
222 else:
223 return val
224
225
226 if __name__ == "__main__":
227 import doctest
228 from logilab.common import sqlgen
229 print doctest.testmod(sqlgen)
230