Package VMBuilder :: Package tests :: Module disk_tests
[frames] | no frames]

Source Code for Module VMBuilder.tests.disk_tests

  1  # 
  2  #    Uncomplicated VM Builder 
  3  #    Copyright (C) 2007-2009 Canonical Ltd. 
  4  # 
  5  #    See AUTHORS for list of contributors 
  6  # 
  7  #    This program is free software: you can redistribute it and/or modify 
  8  #    it under the terms of the GNU General Public License version 3, as 
  9  #    published by the Free Software Foundation. 
 10  # 
 11  #    This program is distributed in the hope that it will be useful, 
 12  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  #    GNU General Public License for more details. 
 15  # 
 16  #    You should have received a copy of the GNU General Public License 
 17  #    along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 18  import os 
 19  import stat 
 20  import tempfile 
 21  import unittest 
 22  import testtools 
 23   
 24  import VMBuilder 
 25  from VMBuilder.disk import detect_size, parse_size, index_to_devname, devname_to_index, Disk 
 26  from VMBuilder.exception import VMBuilderException, VMBuilderUserError 
 27  from VMBuilder.util import run_cmd 
 28   
 29  TestSkipped = testtools.testcase.TestSkipped 
 30  TestCase = testtools.TestCase 
31 32 -def get_temp_filename():
33 (fd, tmpfile) = tempfile.mkstemp() 34 os.close(fd) 35 return tmpfile
36
37 -class MockDistro(object):
39 return True
40
41 -class MockHypervisor(object):
42 - def __init__(self):
43 self.disks = [] 44 self.distro = MockDistro()
45
46 - def add_clean_cb(self, *args, **kwargs):
47 pass
48
49 - def add_disk(self, *args, **kwargs):
50 disk = Disk(self, *args, **kwargs) 51 self.disks.append(disk) 52 return disk
53
54 -class TestSizeParser(TestCase):
56 "Suffixes in size strings are case-insensitive" 57 58 for letter in ['K', 'M', 'G']: 59 self.assertEqual(parse_size('1%s' % letter), parse_size('1%s' % letter.lower()))
60
62 "Suffix-less size string are counted as megabytes" 63 self.assertEqual(parse_size(10), 10) 64 self.assertEqual(parse_size('10'), 10)
65
67 "Sizes with M suffix are counted as megabytes" 68 self.assertEqual(parse_size('10M'), 10)
69
71 "1G is counted as 1024 megabytes" 72 self.assertEqual(parse_size('1G'), 1024)
73
75 "1024K is counted as 1 megabyte" 76 self.assertEqual(parse_size('1024K'), 1)
77
79 "parse_size rounds to nearest MB" 80 self.assertEqual(parse_size('1025K'), 1) 81 self.assertEqual(parse_size('10250K'), 10)
82
83 -class TestSequenceFunctions(TestCase):
84 - def test_index_to_devname(self):
85 self.assertEqual(index_to_devname(0), 'a') 86 self.assertEqual(index_to_devname(26), 'aa') 87 self.assertEqual(index_to_devname(18277), 'zzz')
88
89 - def test_devname_to_index(self):
90 self.assertEqual(devname_to_index('a'), 0) 91 self.assertEqual(devname_to_index('b'), 1) 92 self.assertEqual(devname_to_index('aa'), 26) 93 self.assertEqual(devname_to_index('ab'), 27) 94 self.assertEqual(devname_to_index('z'), 25) 95 self.assertEqual(devname_to_index('zz'), 701) 96 self.assertEqual(devname_to_index('zzz'), 18277)
97
99 for i in range(18277): 100 self.assertEqual(i, devname_to_index(index_to_devname(i)))
101
102 -class TestDetectSize(TestCase):
103 - def setUp(self):
104 TestCase.setUp(self) 105 self.tmpfile = get_temp_filename() 106 run_cmd('qemu-img', 'create', self.tmpfile, '5G') 107 self.imgdev = None
108
109 - def test_detect_size_file(self):
110 self.assertTrue(detect_size(self.tmpfile), 5*1024)
111 112 @testtools.skipIf(os.geteuid() != 0, 'Needs root to run')
114 self.imgdev = run_cmd('losetup', '-f', '--show', self.tmpfile).strip() 115 self.assertTrue(detect_size(self.imgdev), 5*1024)
116
117 - def test_detect_size_fifo(self):
118 os.unlink(self.tmpfile) 119 os.mkfifo(self.tmpfile) 120 self.assertRaises(VMBuilderException, detect_size, self.tmpfile)
121
122 - def tearDown(self):
123 TestCase.tearDown(self) 124 run_cmd('udevadm', 'settle') 125 if self.imgdev: 126 run_cmd('losetup', '-d', self.imgdev) 127 os.unlink(self.tmpfile)
128
129 -class TestDiskPlugin(TestCase):
130 - def test_disk_filename(self):
131 tmpfile = get_temp_filename() 132 os.unlink(tmpfile) 133 134 disk = Disk(MockHypervisor(), tmpfile, size='1G') 135 disk.create() 136 self.assertTrue(os.path.exists(tmpfile)) 137 os.unlink(tmpfile)
138
139 - def test_disk_size(self):
140 # parse_size only deals with MB resolution 141 K = 1024 142 M = K*1024 143 G = M*1024 144 sizes = [('10G', 10*G), 145 ('400M', 400*M), 146 ('345', 345*M), 147 ('10240k', 10*M), 148 ('10250k', 10*M), 149 ('10230k', 9*M)] 150 151 for (sizestr, size) in sizes: 152 tmpfile = get_temp_filename() 153 os.unlink(tmpfile) 154 155 disk = Disk(MockHypervisor(), filename=tmpfile, size=sizestr) 156 disk.create() 157 actual_size = os.stat(tmpfile)[stat.ST_SIZE] 158 self.assertEqual(size, actual_size, 'Asked for %s, expected %d, got %d' % (sizestr, size, actual_size)) 159 os.unlink(tmpfile)
160
161 - def test_disk_no_size_given(self):
162 tmpname = get_temp_filename() 163 os.unlink(tmpname) 164 165 self.assertRaises(VMBuilderUserError, Disk, MockHypervisor(), filename=tmpname)
166
168 tmpname = get_temp_filename() 169 170 self.assertRaises(VMBuilderUserError, Disk, MockHypervisor(), filename=tmpname, size='1G')
171
173 tmpfile = get_temp_filename() 174 175 fp = open(tmpfile, 'w') 176 fp.write('canary') 177 fp.close() 178 179 disk = Disk(MockHypervisor(), tmpfile) 180 disk.create() 181 fp = open(tmpfile, 'r') 182 self.assertEqual(fp.read(), 'canary') 183 fp.close() 184 os.unlink(tmpfile)
185
186 - def test_devletters(self):
187 from string import ascii_lowercase 188 189 hypervisor = MockHypervisor() 190 for (expected_devletter, index) in zip(ascii_lowercase, range(len(ascii_lowercase))): 191 tmpname = get_temp_filename() 192 disk = hypervisor.add_disk(filename=tmpname) 193 devletters = disk.devletters() 194 self.assertEqual(devletters, expected_devletter, 'Disk no. %d returned %s, expected %s.' % (index, devletters, expected_devletter))
195
196 -class TestDiskPartitioningPlugin(TestCase):
197 - def setUp(self):
198 TestCase.setUp(self) 199 self.tmpfile = get_temp_filename() 200 os.unlink(self.tmpfile) 201 202 self.vm = MockHypervisor() 203 self.disk = self.vm.add_disk(self.tmpfile, size='1G') 204 self.disk.create()
205
206 - def tearDown(self):
207 TestCase.tearDown(self) 208 os.unlink(self.tmpfile)
209
210 - def test_partition_overlap(self):
211 self.disk.add_part(1, 512, 'ext3', '/') 212 self.assertRaises(VMBuilderUserError, self.disk.add_part, 512, 512, 'ext3', '/mnt')
213
215 self.assertRaises(VMBuilderUserError, self.disk.add_part, 512, 514, 'ext3', '/')
216
218 from VMBuilder.util import run_cmd 219 220 file_output = run_cmd('file', self.tmpfile) 221 self.assertEqual('%s: data' % self.tmpfile, file_output.strip()) 222 self.disk.partition() 223 file_output = run_cmd('file', self.tmpfile) 224 self.assertEqual('%s: x86 boot sector, code offset 0xb8' % self.tmpfile, file_output.strip()) 225 226 file_output = run_cmd('parted', '--script', self.tmpfile, 'print') 227 self.assertEqual('''Model: (file) 228 Disk %s: 1074MB 229 Sector size (logical/physical): 512B/512B 230 Partition Table: msdos 231 232 Number Start End Size Type File system Flags''' % self.tmpfile, file_output.strip())
233
235 from VMBuilder.util import run_cmd 236 237 self.disk.add_part(1, 1023, 'ext3', '/') 238 self.disk.partition() 239 file_output = run_cmd('parted', '--script', self.tmpfile, 'print') 240 self.assertEqual('''Model: (file) 241 Disk %s: 1074MB 242 Sector size (logical/physical): 512B/512B 243 Partition Table: msdos 244 245 Number Start End Size Type File system Flags 246 1 1049kB 1023MB 1022MB primary''' % self.tmpfile, file_output.strip())
247 248 @testtools.skipIf(os.geteuid() != 0, 'Needs root to run')
249 - def test_map_partitions(self):
250 self.disk.add_part(1, 1023, 'ext3', '/') 251 self.disk.partition() 252 self.disk.map_partitions() 253 try: 254 from VMBuilder.disk import detect_size 255 self.assertEqual(detect_size(self.disk.partitions[0].filename), 1023000576) 256 except: 257 raise 258 finally: 259 self.disk.unmap()
260 261 @testtools.skipIf(os.geteuid() != 0, 'Needs root to run')
262 - def test_mkfs(self):
263 self.disk.add_part(1, 1023, 'ext3', '/') 264 self.disk.partition() 265 self.disk.map_partitions() 266 try: 267 self.disk.mkfs() 268 except: 269 raise 270 finally: 271 self.disk.unmap()
272
273 - def test_get_grub_id(self):
274 self.assertEqual(self.disk.get_grub_id(), '(hd0)') 275 276 tmpfile2 = get_temp_filename() 277 os.unlink(tmpfile2) 278 disk2 = self.vm.add_disk(tmpfile2, '1G') 279 self.assertEqual(self.disk.get_grub_id(), '(hd0)') 280 self.assertEqual(disk2.get_grub_id(), '(hd1)')
281
282 - def test_get_index(self):
283 self.assertEqual(self.disk.get_index(), 0) 284 285 tmpfile2 = get_temp_filename() 286 os.unlink(tmpfile2) 287 disk2 = self.vm.add_disk(tmpfile2, '1G') 288 self.assertEqual(self.disk.get_index(), 0) 289 self.assertEqual(disk2.get_index(), 1)
290