1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """Classes to handle advanced configuration in simple to complex applications.
19
20 Allows to load the configuration from a file or from command line
21 options, to generate a sample configuration file or to display
22 program's usage. Fills the gap between optik/optparse and ConfigParser
23 by adding data types (which are also available as a standalone optik
24 extension in the `optik_ext` module).
25
26
27 Quick start: simplest usage
28 ---------------------------
29
30 .. python ::
31
32 >>> import sys
33 >>> from logilab.common.configuration import Configuration
34 >>> options = [('dothis', {'type':'yn', 'default': True, 'metavar': '<y or n>'}),
35 ... ('value', {'type': 'string', 'metavar': '<string>'}),
36 ... ('multiple', {'type': 'csv', 'default': ('yop',),
37 ... 'metavar': '<comma separated values>',
38 ... 'help': 'you can also document the option'}),
39 ... ('number', {'type': 'int', 'default':2, 'metavar':'<int>'}),
40 ... ]
41 >>> config = Configuration(options=options, name='My config')
42 >>> print config['dothis']
43 True
44 >>> print config['value']
45 None
46 >>> print config['multiple']
47 ('yop',)
48 >>> print config['number']
49 2
50 >>> print config.help()
51 Usage: [options]
52
53 Options:
54 -h, --help show this help message and exit
55 --dothis=<y or n>
56 --value=<string>
57 --multiple=<comma separated values>
58 you can also document the option [current: none]
59 --number=<int>
60
61 >>> f = open('myconfig.ini', 'w')
62 >>> f.write('''[MY CONFIG]
63 ... number = 3
64 ... dothis = no
65 ... multiple = 1,2,3
66 ... ''')
67 >>> f.close()
68 >>> config.load_file_configuration('myconfig.ini')
69 >>> print config['dothis']
70 False
71 >>> print config['value']
72 None
73 >>> print config['multiple']
74 ['1', '2', '3']
75 >>> print config['number']
76 3
77 >>> sys.argv = ['mon prog', '--value', 'bacon', '--multiple', '4,5,6',
78 ... 'nonoptionargument']
79 >>> print config.load_command_line_configuration()
80 ['nonoptionargument']
81 >>> print config['value']
82 bacon
83 >>> config.generate_config()
84 # class for simple configurations which don't need the
85 # manager / providers model and prefer delegation to inheritance
86 #
87 # configuration values are accessible through a dict like interface
88 #
89 [MY CONFIG]
90
91 dothis=no
92
93 value=bacon
94
95 # you can also document the option
96 multiple=4,5,6
97
98 number=3
99 >>>
100 """
101 __docformat__ = "restructuredtext en"
102
103 __all__ = ('OptionsManagerMixIn', 'OptionsProviderMixIn',
104 'ConfigurationMixIn', 'Configuration',
105 'OptionsManager2ConfigurationAdapter')
106
107 import os
108 import sys
109 import re
110 from os.path import exists, expanduser
111 from copy import copy
112 from configparser import ConfigParser, NoOptionError, NoSectionError, \
113 DuplicateSectionError
114 from warnings import warn
115
116 from logilab.common.compat import callable, raw_input, str_encode as _encode
117
118 from logilab.common.textutils import normalize_text, unquote
119 from logilab.common import optik_ext as optparse
120 import collections
121
122 OptionError = optparse.OptionError
123
124 REQUIRED = []
125
127 """raised by set_option when it doesn't know what to do for an action"""
128
129
131 encoding = encoding or getattr(stream, 'encoding', None)
132 if not encoding:
133 import locale
134 encoding = locale.getpreferredencoding()
135 return encoding
136
137
138
139
141 """validate and return a converted value for option of type 'choice'
142 """
143 if not value in optdict['choices']:
144 msg = "option %s: invalid value: %r, should be in %s"
145 raise optparse.OptionValueError(msg % (name, value, optdict['choices']))
146 return value
147
149 """validate and return a converted value for option of type 'choice'
150 """
151 choices = optdict['choices']
152 values = optparse.check_csv(None, name, value)
153 for value in values:
154 if not value in choices:
155 msg = "option %s: invalid value: %r, should be in %s"
156 raise optparse.OptionValueError(msg % (name, value, choices))
157 return values
158
160 """validate and return a converted value for option of type 'csv'
161 """
162 return optparse.check_csv(None, name, value)
163
165 """validate and return a converted value for option of type 'yn'
166 """
167 return optparse.check_yn(None, name, value)
168
170 """validate and return a converted value for option of type 'named'
171 """
172 return optparse.check_named(None, name, value)
173
175 """validate and return a filepath for option of type 'file'"""
176 return optparse.check_file(None, name, value)
177
179 """validate and return a valid color for option of type 'color'"""
180 return optparse.check_color(None, name, value)
181
183 """validate and return a string for option of type 'password'"""
184 return optparse.check_password(None, name, value)
185
187 """validate and return a mx DateTime object for option of type 'date'"""
188 return optparse.check_date(None, name, value)
189
191 """validate and return a time object for option of type 'time'"""
192 return optparse.check_time(None, name, value)
193
195 """validate and return an integer for option of type 'bytes'"""
196 return optparse.check_bytes(None, name, value)
197
198
199 VALIDATORS = {'string': unquote,
200 'int': int,
201 'float': float,
202 'file': file_validator,
203 'font': unquote,
204 'color': color_validator,
205 'regexp': re.compile,
206 'csv': csv_validator,
207 'yn': yn_validator,
208 'bool': yn_validator,
209 'named': named_validator,
210 'password': password_validator,
211 'date': date_validator,
212 'time': time_validator,
213 'bytes': bytes_validator,
214 'choice': choice_validator,
215 'multiple_choice': multiple_choice_validator,
216 }
217
219 if opttype not in VALIDATORS:
220 raise Exception('Unsupported type "%s"' % opttype)
221 try:
222 return VALIDATORS[opttype](optdict, option, value)
223 except TypeError:
224 try:
225 return VALIDATORS[opttype](value)
226 except optparse.OptionValueError:
227 raise
228 except:
229 raise optparse.OptionValueError('%s value (%r) should be of type %s' %
230 (option, value, opttype))
231
232
233
242
246
258 return input_validator
259
260 INPUT_FUNCTIONS = {
261 'string': input_string,
262 'password': input_password,
263 }
264
265 for opttype in list(VALIDATORS.keys()):
266 INPUT_FUNCTIONS.setdefault(opttype, _make_input_function(opttype))
267
269 """monkey patch OptionParser.expand_default since we have a particular
270 way to handle defaults to avoid overriding values in the configuration
271 file
272 """
273 if self.parser is None or not self.default_tag:
274 return option.help
275 optname = option._long_opts[0][2:]
276 try:
277 provider = self.parser.options_manager._all_options[optname]
278 except KeyError:
279 value = None
280 else:
281 optdict = provider.get_option_def(optname)
282 optname = provider.option_name(optname, optdict)
283 value = getattr(provider.config, optname, optdict)
284 value = format_option_value(optdict, value)
285 if value is optparse.NO_DEFAULT or not value:
286 value = self.NO_DEFAULT_VALUE
287 return option.help.replace(self.default_tag, str(value))
288
289
290 -def convert(value, optdict, name=''):
291 """return a validated value for an option according to its type
292
293 optional argument name is only used for error message formatting
294 """
295 try:
296 _type = optdict['type']
297 except KeyError:
298
299 return value
300 return _call_validator(_type, optdict, name, value)
301
306
323
338
357
365
382
383 format_section = ini_format_section
384
404
405
407 """MixIn to handle a configuration from both a configuration file and
408 command line options
409 """
410
411 - def __init__(self, usage, config_file=None, version=None, quiet=0):
412 self.config_file = config_file
413 self.reset_parsers(usage, version=version)
414
415 self.options_providers = []
416
417 self._all_options = {}
418 self._short_options = {}
419 self._nocallback_options = {}
420 self._mygroups = dict()
421
422 self.quiet = quiet
423 self._maxlevel = 0
424
426
427 self.cfgfile_parser = ConfigParser()
428
429 self.cmdline_parser = optparse.OptionParser(usage=usage, version=version)
430 self.cmdline_parser.options_manager = self
431 self._optik_option_attrs = set(self.cmdline_parser.option_class.ATTRS)
432
434 """register an options provider"""
435 assert provider.priority <= 0, "provider's priority can't be >= 0"
436 for i in range(len(self.options_providers)):
437 if provider.priority > self.options_providers[i].priority:
438 self.options_providers.insert(i, provider)
439 break
440 else:
441 self.options_providers.append(provider)
442 non_group_spec_options = [option for option in provider.options
443 if 'group' not in option[1]]
444 groups = getattr(provider, 'option_groups', ())
445 if own_group and non_group_spec_options:
446 self.add_option_group(provider.name.upper(), provider.__doc__,
447 non_group_spec_options, provider)
448 else:
449 for opt, optdict in non_group_spec_options:
450 self.add_optik_option(provider, self.cmdline_parser, opt, optdict)
451 for gname, gdoc in groups:
452 gname = gname.upper()
453 goptions = [option for option in provider.options
454 if option[1].get('group', '').upper() == gname]
455 self.add_option_group(gname, gdoc, goptions, provider)
456
458 """add an option group including the listed options
459 """
460 assert options
461
462 if group_name in self._mygroups:
463 group = self._mygroups[group_name]
464 else:
465 group = optparse.OptionGroup(self.cmdline_parser,
466 title=group_name.capitalize())
467 self.cmdline_parser.add_option_group(group)
468 group.level = provider.level
469 self._mygroups[group_name] = group
470
471 if group_name != "DEFAULT":
472 self.cfgfile_parser.add_section(group_name)
473
474 for opt, optdict in options:
475 self.add_optik_option(provider, group, opt, optdict)
476
478 if 'inputlevel' in optdict:
479 warn('[0.50] "inputlevel" in option dictionary for %s is deprecated,'
480 ' use "level"' % opt, DeprecationWarning)
481 optdict['level'] = optdict.pop('inputlevel')
482 args, optdict = self.optik_option(provider, opt, optdict)
483 option = optikcontainer.add_option(*args, **optdict)
484 self._all_options[opt] = provider
485 self._maxlevel = max(self._maxlevel, option.level or 0)
486
488 """get our personal option definition and return a suitable form for
489 use with optik/optparse
490 """
491 optdict = copy(optdict)
492 others = {}
493 if 'action' in optdict:
494 self._nocallback_options[provider] = opt
495 else:
496 optdict['action'] = 'callback'
497 optdict['callback'] = self.cb_set_provider_option
498
499
500 if 'default' in optdict:
501 if (optparse.OPTPARSE_FORMAT_DEFAULT and 'help' in optdict and
502 optdict.get('default') is not None and
503 not optdict['action'] in ('store_true', 'store_false')):
504 optdict['help'] += ' [current: %default]'
505 del optdict['default']
506 args = ['--' + str(opt)]
507 if 'short' in optdict:
508 self._short_options[optdict['short']] = opt
509 args.append('-' + optdict['short'])
510 del optdict['short']
511
512 for key in list(optdict.keys()):
513 if not key in self._optik_option_attrs:
514 optdict.pop(key)
515 return args, optdict
516
518 """optik callback for option setting"""
519 if opt.startswith('--'):
520
521 opt = opt[2:]
522 else:
523
524 opt = self._short_options[opt[1:]]
525
526 if value is None:
527 value = 1
528 self.global_set_option(opt, value)
529
531 """set option on the correct option provider"""
532 self._all_options[opt].set_option(opt, value)
533
535 """write a configuration file according to the current configuration
536 into the given stream or stdout
537 """
538 options_by_section = {}
539 sections = []
540 for provider in self.options_providers:
541 for section, options in provider.options_by_section():
542 if section is None:
543 section = provider.name
544 if section in skipsections:
545 continue
546 options = [(n, d, v) for (n, d, v) in options
547 if d.get('type') is not None]
548 if not options:
549 continue
550 if not section in sections:
551 sections.append(section)
552 alloptions = options_by_section.setdefault(section, [])
553 alloptions += options
554 stream = stream or sys.stdout
555 encoding = _get_encoding(encoding, stream)
556 printed = False
557 for section in sections:
558 if printed:
559 print('\n', file=stream)
560 format_section(stream, section.upper(), options_by_section[section],
561 encoding)
562 printed = True
563
564 - def generate_manpage(self, pkginfo, section=1, stream=None):
565 """write a man page for the current configuration into the given
566 stream or stdout
567 """
568 self._monkeypatch_expand_default()
569 try:
570 optparse.generate_manpage(self.cmdline_parser, pkginfo,
571 section, stream=stream or sys.stdout,
572 level=self._maxlevel)
573 finally:
574 self._unmonkeypatch_expand_default()
575
576
577
579 """initialize configuration using default values"""
580 for provider in self.options_providers:
581 provider.load_defaults()
582
587
589 """read the configuration file but do not load it (i.e. dispatching
590 values to each options provider)
591 """
592 helplevel = 1
593 while helplevel <= self._maxlevel:
594 opt = '-'.join(['long'] * helplevel) + '-help'
595 if opt in self._all_options:
596 break
597 def helpfunc(option, opt, val, p, level=helplevel):
598 print(self.help(level))
599 sys.exit(0)
600 helpmsg = '%s verbose help.' % ' '.join(['more'] * helplevel)
601 optdict = {'action' : 'callback', 'callback' : helpfunc,
602 'help' : helpmsg}
603 provider = self.options_providers[0]
604 self.add_optik_option(provider, self.cmdline_parser, opt, optdict)
605 provider.options += ( (opt, optdict), )
606 helplevel += 1
607 if config_file is None:
608 config_file = self.config_file
609 if config_file is not None:
610 config_file = expanduser(config_file)
611 if config_file and exists(config_file):
612 parser = self.cfgfile_parser
613 parser.read([config_file])
614
615 for sect, values in list(parser._sections.items()):
616 if not sect.isupper() and values:
617 parser._sections[sect.upper()] = values
618 elif not self.quiet:
619 msg = 'No config file found, using default configuration'
620 print(msg, file=sys.stderr)
621 return
622
640
642 """dispatch values previously read from a configuration file to each
643 options provider)
644 """
645 parser = self.cfgfile_parser
646 for provider in self.options_providers:
647 for section, option, optdict in provider.all_options():
648 try:
649 value = parser.get(section, option)
650 provider.set_option(option, value, optdict=optdict)
651 except (NoSectionError, NoOptionError) as ex:
652 continue
653
655 """override configuration according to given parameters
656 """
657 for opt, opt_value in list(kwargs.items()):
658 opt = opt.replace('_', '-')
659 provider = self._all_options[opt]
660 provider.set_option(opt, opt_value)
661
663 """override configuration according to command line parameters
664
665 return additional arguments
666 """
667 self._monkeypatch_expand_default()
668 try:
669 if args is None:
670 args = sys.argv[1:]
671 else:
672 args = list(args)
673 (options, args) = self.cmdline_parser.parse_args(args=args)
674 for provider in list(self._nocallback_options.keys()):
675 config = provider.config
676 for attr in list(config.__dict__.keys()):
677 value = getattr(options, attr, None)
678 if value is None:
679 continue
680 setattr(config, attr, value)
681 return args
682 finally:
683 self._unmonkeypatch_expand_default()
684
685
686
687
689 """add a dummy option section for help purpose """
690 group = optparse.OptionGroup(self.cmdline_parser,
691 title=title.capitalize(),
692 description=description)
693 group.level = level
694 self._maxlevel = max(self._maxlevel, level)
695 self.cmdline_parser.add_option_group(group)
696
698
699 try:
700 self.__expand_default_backup = optparse.HelpFormatter.expand_default
701 optparse.HelpFormatter.expand_default = expand_default
702 except AttributeError:
703
704 pass
706
707 if hasattr(optparse.HelpFormatter, 'expand_default'):
708
709 optparse.HelpFormatter.expand_default = self.__expand_default_backup
710
711 - def help(self, level=0):
712 """return the usage string for available options """
713 self.cmdline_parser.formatter.output_level = level
714 self._monkeypatch_expand_default()
715 try:
716 return self.cmdline_parser.format_help()
717 finally:
718 self._unmonkeypatch_expand_default()
719
720
722 """used to ease late binding of default method (so you can define options
723 on the class using default methods on the configuration instance)
724 """
726 self.method = methname
727 self._inst = None
728
729 - def bind(self, instance):
730 """bind the method to its instance"""
731 if self._inst is None:
732 self._inst = instance
733
735 assert self._inst, 'unbound method'
736 return getattr(self._inst, self.method)(*args, **kwargs)
737
738
740 """Mixin to provide options to an OptionsManager"""
741
742
743 priority = -1
744 name = 'default'
745 options = ()
746 level = 0
747
749 self.config = optparse.Values()
750 for option in self.options:
751 try:
752 option, optdict = option
753 except ValueError:
754 raise Exception('Bad option: %r' % option)
755 if isinstance(optdict.get('default'), Method):
756 optdict['default'].bind(self)
757 elif isinstance(optdict.get('callback'), Method):
758 optdict['callback'].bind(self)
759 self.load_defaults()
760
762 """initialize the provider using default values"""
763 for opt, optdict in self.options:
764 action = optdict.get('action')
765 if action != 'callback':
766
767 default = self.option_default(opt, optdict)
768 if default is REQUIRED:
769 continue
770 self.set_option(opt, default, action, optdict)
771
773 """return the default value for an option"""
774 if optdict is None:
775 optdict = self.get_option_def(opt)
776 default = optdict.get('default')
777 if isinstance(default, collections.Callable):
778 default = default()
779 return default
780
782 """get the config attribute corresponding to opt
783 """
784 if optdict is None:
785 optdict = self.get_option_def(opt)
786 return optdict.get('dest', opt.replace('-', '_'))
787
789 """get the current value for the given option"""
790 return getattr(self.config, self.option_name(opt), None)
791
792 - def set_option(self, opt, value, action=None, optdict=None):
793 """method called to set an option (registered in the options list)
794 """
795
796 if optdict is None:
797 optdict = self.get_option_def(opt)
798 if value is not None:
799 value = convert(value, optdict, opt)
800 if action is None:
801 action = optdict.get('action', 'store')
802 if optdict.get('type') == 'named':
803 optname = self.option_name(opt, optdict)
804 currentvalue = getattr(self.config, optname, None)
805 if currentvalue:
806 currentvalue.update(value)
807 value = currentvalue
808 if action == 'store':
809 setattr(self.config, self.option_name(opt, optdict), value)
810 elif action in ('store_true', 'count'):
811 setattr(self.config, self.option_name(opt, optdict), 0)
812 elif action == 'store_false':
813 setattr(self.config, self.option_name(opt, optdict), 1)
814 elif action == 'append':
815 opt = self.option_name(opt, optdict)
816 _list = getattr(self.config, opt, None)
817 if _list is None:
818 if isinstance(value, (list, tuple)):
819 _list = value
820 elif value is not None:
821 _list = []
822 _list.append(value)
823 setattr(self.config, opt, _list)
824 elif isinstance(_list, tuple):
825 setattr(self.config, opt, _list + (value,))
826 else:
827 _list.append(value)
828 elif action == 'callback':
829 optdict['callback'](None, opt, value, None)
830 else:
831 raise UnsupportedAction(action)
832
853
855 """return the dictionary defining an option given it's name"""
856 assert self.options
857 for option in self.options:
858 if option[0] == opt:
859 return option[1]
860 raise OptionError('no such option %s in section %r'
861 % (opt, self.name), opt)
862
863
865 """return an iterator on available options for this provider
866 option are actually described by a 3-uple:
867 (section, option name, option dictionary)
868 """
869 for section, options in self.options_by_section():
870 if section is None:
871 if self.name is None:
872 continue
873 section = self.name.upper()
874 for option, optiondict, value in options:
875 yield section, option, optiondict
876
878 """return an iterator on options grouped by section
879
880 (section, [list of (optname, optdict, optvalue)])
881 """
882 sections = {}
883 for optname, optdict in self.options:
884 sections.setdefault(optdict.get('group'), []).append(
885 (optname, optdict, self.option_value(optname)))
886 if None in sections:
887 yield None, sections.pop(None)
888 for section, options in list(sections.items()):
889 yield section.upper(), options
890
896
897
899 """basic mixin for simple configurations which don't need the
900 manager / providers model
901 """
918
927
930
932 return iter(self.config.__dict__.items())
933
935 try:
936 return getattr(self.config, self.option_name(key))
937 except (optparse.OptionValueError, AttributeError):
938 raise KeyError(key)
939
942
943 - def get(self, key, default=None):
944 try:
945 return getattr(self.config, self.option_name(key))
946 except (OptionError, AttributeError):
947 return default
948
949
951 """class for simple configurations which don't need the
952 manager / providers model and prefer delegation to inheritance
953
954 configuration values are accessible through a dict like interface
955 """
956
957 - def __init__(self, config_file=None, options=None, name=None,
958 usage=None, doc=None, version=None):
966
967
969 """Adapt an option manager to behave like a
970 `logilab.common.configuration.Configuration` instance
971 """
973 self.config = provider
974
976 return getattr(self.config, key)
977
979 provider = self.config._all_options[key]
980 try:
981 return getattr(provider.config, provider.option_name(key))
982 except AttributeError:
983 raise KeyError(key)
984
987
988 - def get(self, key, default=None):
989 provider = self.config._all_options[key]
990 try:
991 return getattr(provider.config, provider.option_name(key))
992 except AttributeError:
993 return default
994
995
997 """initialize newconfig from a deprecated configuration file
998
999 possible changes:
1000 * ('renamed', oldname, newname)
1001 * ('moved', option, oldgroup, newgroup)
1002 * ('typechanged', option, oldtype, newvalue)
1003 """
1004
1005 changesindex = {}
1006 for action in changes:
1007 if action[0] == 'moved':
1008 option, oldgroup, newgroup = action[1:]
1009 changesindex.setdefault(option, []).append((action[0], oldgroup, newgroup))
1010 continue
1011 if action[0] == 'renamed':
1012 oldname, newname = action[1:]
1013 changesindex.setdefault(newname, []).append((action[0], oldname))
1014 continue
1015 if action[0] == 'typechanged':
1016 option, oldtype, newvalue = action[1:]
1017 changesindex.setdefault(option, []).append((action[0], oldtype, newvalue))
1018 continue
1019 if action[1] in ('added', 'removed'):
1020 continue
1021 raise Exception('unknown change %s' % action[0])
1022
1023 options = []
1024 for optname, optdef in newconfig.options:
1025 for action in changesindex.pop(optname, ()):
1026 if action[0] == 'moved':
1027 oldgroup, newgroup = action[1:]
1028 optdef = optdef.copy()
1029 optdef['group'] = oldgroup
1030 elif action[0] == 'renamed':
1031 optname = action[1]
1032 elif action[0] == 'typechanged':
1033 oldtype = action[1]
1034 optdef = optdef.copy()
1035 optdef['type'] = oldtype
1036 options.append((optname, optdef))
1037 if changesindex:
1038 raise Exception('unapplied changes: %s' % changesindex)
1039 oldconfig = Configuration(options=options, name=newconfig.name)
1040
1041 oldconfig.load_file_configuration(configfile)
1042
1043 changes.reverse()
1044 done = set()
1045 for action in changes:
1046 if action[0] == 'renamed':
1047 oldname, newname = action[1:]
1048 newconfig[newname] = oldconfig[oldname]
1049 done.add(newname)
1050 elif action[0] == 'typechanged':
1051 optname, oldtype, newvalue = action[1:]
1052 newconfig[optname] = newvalue
1053 done.add(optname)
1054 for optname, optdef in newconfig.options:
1055 if optdef.get('type') and not optname in done:
1056 newconfig.set_option(optname, oldconfig[optname], optdict=optdef)
1057
1058
1060 """preprocess options to remove duplicate"""
1061 alloptions = {}
1062 options = list(options)
1063 for i in range(len(options)-1, -1, -1):
1064 optname, optdict = options[i]
1065 if optname in alloptions:
1066 options.pop(i)
1067 alloptions[optname].update(optdict)
1068 else:
1069 alloptions[optname] = optdict
1070 return tuple(options)
1071