Source code for qcodes.configuration.config

import copy
import json
import logging
import os
from os.path import expanduser
from pathlib import Path
from typing import Any, Dict, Mapping, Optional, Tuple, Union

import jsonschema

logger = logging.getLogger(__name__)

EMPTY_USER_SCHEMA = "User schema at {} not found." + \
                    "User settings won't be validated"
MISS_DESC = """ Passing a description without a type does not make sense.
Description is ignored """

    "$schema": "",
    "type": "object",
    "description": "schema for a user qcodes config file",
    "properties": {},
    "required": []

[docs]class Config: """ QCoDeS config system Start with sane defaults, which you can't change, and then customize your experience using files that update the configuration. """ config_file_name = "qcodesrc.json" """Name of config file""" schema_file_name = "qcodesrc_schema.json" """Name of schema file""" # get abs path of packge config file default_file_name = str(Path(__file__).parent / config_file_name) """Filename of default config""" current_config_path = default_file_name """Path of the last loaded config file""" _loaded_config_files = [default_file_name] # get abs path of schema file schema_default_file_name = str(Path(__file__).parent / schema_file_name) """Filename of default schema""" # home dir, os independent home_file_name = expanduser(os.path.join("~", config_file_name)) """Filename of home config""" schema_home_file_name = home_file_name.replace(config_file_name, schema_file_name) """Filename of home schema""" # this is for *nix people env_file_name = os.environ.get("QCODES_CONFIG", "") """Filename of env config""" schema_env_file_name = env_file_name.replace(config_file_name, schema_file_name) """Filename of env schema""" # current working dir cwd_file_name = os.path.join(Path.cwd(), config_file_name) """Filename of cwd config""" schema_cwd_file_name = cwd_file_name.replace(config_file_name, schema_file_name) """Filename of cwd schema""" current_schema: Optional['DotDict'] = None """Validators and descriptions of config values""" current_config: Optional['DotDict'] = None """Valid config values""" defaults: 'DotDict' """The default configuration""" defaults_schema: 'DotDict' """The default schema""" _diff_config: Dict[str, Any] = {} _diff_schema: Dict[str, Any] = {} def __init__(self, path: Optional[str] = None) -> None: """ Args: path: Optional path to directory containing a `qcodesrc.json` config file """ self.config_file_path = path self.defaults, self.defaults_schema = self.load_default() self.update_config()
[docs] def load_default(self) -> Tuple['DotDict', 'DotDict']: defaults = self.load_config(self.default_file_name) defaults_schema = self.load_config(self.schema_default_file_name) self.validate(defaults, defaults_schema) return defaults, defaults_schema
[docs] def update_config(self, path: Optional[str] = None) -> Dict[str, Any]: """ Load defaults updates with cwd, env, home and the path specified and validates. A configuration file must be called qcodesrc.json A schema file must be called qcodesrc_schema.json Configuration files (and their schema) are loaded and updated from the directories in the following order: - default json config file from the repository - user json config in user home directory - user json config in $QCODES_CONFIG - user json config in current working directory - user json file in the path specified If a key/value is not specified in the user configuration the default is used. Key/value pairs loaded later will take preference over those loaded earlier. Configs are validated after every update. Validation is also performed against a user provided schema if it's found in the directory. Args: path: Optional path to directory containing a `qcodesrc.json` config file """ config = copy.deepcopy(self.defaults) self.current_schema = copy.deepcopy(self.defaults_schema) self._loaded_config_files = [self.default_file_name] self._update_config_from_file(self.home_file_name, self.schema_home_file_name, config) self._update_config_from_file(self.env_file_name, self.schema_env_file_name, config) self._update_config_from_file(self.cwd_file_name, self.schema_cwd_file_name, config) if path is not None: self.config_file_path = path if self.config_file_path is not None: config_file = os.path.join(self.config_file_path, self.config_file_name) schema_file = os.path.join(self.config_file_path, self.schema_file_name) self._update_config_from_file(config_file, schema_file, config) if config is None: raise RuntimeError("Could not load config from any of the " "expected locations.") self.current_config = config self.current_config_path = self._loaded_config_files[-1] return config
def _update_config_from_file(self, file_path: str, schema: str, config: Dict[str, Any] ) -> None: """ Updated ``config`` dictionary with config information from file in ``file_path`` that has schema specified in ``schema`` Args: file_path: Path to `qcodesrc.json` config file schema: Path to `qcodesrc_schema.json` to be used config: Config dictionary to be updated. """ if os.path.isfile(file_path): self._loaded_config_files.append(file_path) my_config = self.load_config(file_path) config = update(config, my_config) self.validate(config, self.current_schema, schema)
[docs] def validate(self, json_config: Optional[Dict[str, Any]] = None, schema: Optional[Dict[str, Any]] = None, extra_schema_path: Optional[str] = None ) -> None: """ Validate configuration; if no arguments are passed, the default validators are used. Args: json_config: json file to validate schema: schema dictionary extra_schema_path: schema path that contains extra validators to be added to schema dictionary """ if extra_schema_path is not None: # add custom validation if os.path.isfile(extra_schema_path): with open(extra_schema_path) as f: # user schema has to be both valid in itself # but then just update the user properties # so that default types and values can NEVER # be overwritten new_user = json.load(f)["properties"]["user"] if schema is None: if self.current_schema is None: raise RuntimeError("Cannot validate as " "current_schema is None") schema = self.current_schema user = schema["properties"]['user'] user["properties"].update(new_user["properties"]) jsonschema.validate(json_config, schema) else: logger.warning(EMPTY_USER_SCHEMA.format(extra_schema_path)) else: if json_config is None and schema is None: jsonschema.validate(self.current_config, self.current_schema) else: jsonschema.validate(json_config, schema)
[docs] def add(self, key: str, value: Any, value_type: Optional[str] = None, description: Optional[str] = None, default: Optional[Any] = None ) -> None: """Add custom config value in place Adds ``key``, ``value`` with optional ``value_type`` to user config and schema. If ``value_type`` is specified then the new value is validated. Args: key: key to be added under user config value: value to add to config value_type: type of value, allowed are string, boolean, integer description: description of key to add to schema default: default value, stored only in the schema Examples: >>> defaults.add("trace_color", "blue", "string", "description") will update the config: :: ... "user": { "trace_color": "blue"} ... and the schema: :: ... "user":{ "type" : "object", "description": "controls user settings of qcodes" "properties" : { "trace_color": { "description" : "description", "type": "string" } } } ... Todo: - Add enum support for value_type - finish _diffing """ if self.current_config is None: raise RuntimeError("Cannot add value to empty config") self.current_config["user"].update({key: value}) if self._diff_config.get("user", True): self._diff_config["user"] = {} self._diff_config["user"].update({key: value}) if value_type is None: if description is not None: logger.warning(MISS_DESC) else: # update schema! schema_entry: Dict[str, Dict[str, Union[str, Any]]] schema_entry = {key: {"type": value_type}} if description is not None: schema_entry = { key: { "type": value_type, "default": default, "description": description } } # the schema is nested we only update properties of the user object if self.current_schema is None: raise RuntimeError("Cannot add value as no current schema is " "set") user = self.current_schema['properties']["user"] user["properties"].update(schema_entry) self.validate(self.current_config, self.current_schema) # TODO(giulioungaretti) finish diffing # now we update the entire schema # and the entire configuration # if it's saved then it will always # take precedence even if the defaults # values are changed upstream, and the local # ones were actually left to their default # values if not self._diff_schema: self._diff_schema = BASE_SCHEMA props = self._diff_schema['properties'] if props.get("user", True): props["user"] = {} props["user"].update(schema_entry)
[docs] @staticmethod def load_config(path: str) -> 'DotDict': """Load a config JSON file Args: path: path to the config file Return: a dot accessible dictionary config object Raises: FileNotFoundError: if config is missing """ with open(path) as fp: config = json.load(fp) logger.debug(f'Loading config from {path}') config_dot_dict = DotDict(config) return config_dot_dict
[docs] def save_config(self, path: str) -> None: """ Save current config to file at given path. Args: path: path of new file """ with open(path, "w") as fp: json.dump(self.current_config, fp, indent=4)
[docs] def save_schema(self, path: str) -> None: """ Save current schema to file at given path. Args: path: path of new file """ with open(path, "w") as fp: json.dump(self.current_schema, fp, indent=4)
[docs] def save_to_home(self) -> None: """Save config and schema to files in home dir""" self.save_config(self.home_file_name) self.save_schema(self.schema_home_file_name)
[docs] def save_to_env(self) -> None: """Save config and schema to files in path specified in env variable""" self.save_config(self.env_file_name) self.save_schema(self.schema_env_file_name)
[docs] def save_to_cwd(self) -> None: """Save config and schema to files in current working dir""" self.save_config(self.cwd_file_name) self.save_schema(self.schema_cwd_file_name)
[docs] def describe(self, name: str) -> str: """ Describe a configuration entry Args: name: name of entry to describe in 'dotdict' notation, e.g. name="user.scriptfolder" """ val = self.current_config if val is None: raise RuntimeError(f"Config is empty, cannot describe entry.") if self.current_schema is None: raise RuntimeError("No schema found, cannot describe entry.") sch = self.current_schema["properties"] for key in name.split('.'): if val is None: raise RuntimeError(f"Cannot describe {name} Some part of it " f"is null") val = val[key] if sch.get(key): sch = sch[key] else: sch = sch['properties'][key] description = sch.get("description", None) or "Generic value" _type = str(sch.get("type", None)) or "Not defined" default = sch.get("default", None) or "Not defined" # add cool description to docstring base_docstring = """{}.\nCurrent value: {}. Type: {}. Default: {}.""" doc = base_docstring.format(description, val, _type, default) return doc
def __getitem__(self, name: str) -> Any: val = self.current_config for key in name.split('.'): if val is None: raise KeyError(f"{name} not found in current config") val = val[key] return val def __getattr__(self, name: str) -> Any: return getattr(self.current_config, name) def __repr__(self) -> str: old = super().__repr__() output = (f"Current values: \n {self.current_config} \n" f"Current paths: \n {self._loaded_config_files} \n" f"{old}") return output
[docs]class DotDict(Dict[str, Any]): """ Wrapper dict that allows to get dotted attributes Requires keys to be strings. """ def __init__(self, value: Optional[Mapping[str, Any]] = None): if value is None: pass else: for key in value: self.__setitem__(key, value[key]) def __setitem__(self, key: str, value: Any) -> None: if '.' in key: myKey, restOfKey = key.split('.', 1) target = self.setdefault(myKey, DotDict()) target[restOfKey] = value else: if isinstance(value, dict) and not isinstance(value, DotDict): value = DotDict(value) dict.__setitem__(self, key, value) def __getitem__(self, key: str) -> Any: if '.' not in key: return dict.__getitem__(self, key) myKey, restOfKey = key.split('.', 1) target = dict.__getitem__(self, myKey) return target[restOfKey] def __contains__(self, key: object) -> bool: if not isinstance(key, str): return False if '.' not in key: return super().__contains__(key) myKey, restOfKey = key.split('.', 1) target = dict.__getitem__(self, myKey) return restOfKey in target def __deepcopy__(self, memo: Optional[Dict[Any, Any]]) -> 'DotDict': return DotDict(copy.deepcopy(dict(self)))
[docs] def __getattr__(self, name: str) -> Any: """ Overwrite ``__getattr__`` to provide dot access """ return self.__getitem__(name)
[docs] def __setattr__(self, key: str, value: Any) -> None: """ Overwrite ``__setattr__`` to provide dot access """ self.__setitem__(key, value)
def update(d: Dict[Any, Any], u: Mapping[Any, Any]) -> Dict[Any, Any]: for k, v in u.items(): if isinstance(v, r = update(d.get(k, {}), v) d[k] = r else: d[k] = u[k] return d