Source code for pywbem_mock._baseprovider

# (C) Copyright 2018
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
# Author: Karl  Schopmeyer <>

A CIM provider creates WBEM server responses to operations defined in DSP0200.

The BaseProvider module contains methods required by both builtin
providers and user-defined providers. This module includes methods for the
following functionality:

  * Managing namespaces in the CIM repository
  * Methods that provide access to specific objects in the CIM repository
    including the processing consistent with filtering the returned objects.
    For example, `get_class(...)` the internal equilavent of the GetClass.

  BaseProvider does not have direct access to the common methods
  defined in _wbemconnection_mock

from pywbem._vendor.nocaselist import NocaseList
from pywbem import CIMError, \
    CIM_ERR_NOT_FOUND, CIMInstance, CIMClass, \
from pywbem._nocasedict import NocaseDict
from pywbem._utils import _format

# pywbem_mock implementation configuration variables that are used in
# request responsders.

__all__ = ['BaseProvider']

[docs] class BaseProvider: """ BaseProvider is the top level class in the provider hierarchy and includes methods required by both builtin providers and user-defined providers. This class is not intended to be executed directly. """ def __init__(self, cimrepository): """ Parameters: cimrepository (:class:`~pywbem_mock.BaseRepository` or subclass): Defines the repository to be used by providers. The repository is fully initialized but contains only objects defined by the :class:`~pywbem_mock.FakedWBEMConnection` methods that create objects in the repository (ex. :meth:`~pywbem_mock.FakedWBEMConnection.compile_mof_file`) """ self._cimrepository = cimrepository self._interop_namespace_names = \ NocaseList(WBEMServer.INTEROP_NAMESPACES)
[docs] def __repr__(self): return _format( "{s.__class__.__name__}(" "provider_classnames={s.provider_classnames!A}, " "_interop_namespace_names={s._interop_namespace_names!A}, " "_cim_repository={s._cimrepository}", s=self)
@property def cimrepository(self): """ :class:`~pywbem_mock.BaseRepository` or subclass: Instance of class that represents the CIM repository. The CIM repository is the data store for CIM classes, CIM instances, and CIM qualifier declarations. All access to the mocker CIM data must pass through this variable to the CIM repository. See :class:`~pywbem_mock.BaseRepository` for a description of the repository interface. The mocker may use subclasses of :class:`~pywbem_mock.BaseRepository` for specific functionality. Thus, :class:`~pywbem_mock.InMemoryRepository` implements an InMemory CIM repository that must be initialized for each :class:`~pywbem_mock.FakedWBEMConnection` construction. """ return self._cimrepository @property def namespaces(self): """ :term:`NocaseList` of :term:`string`: The names of the namespaces that exist in the CIM repository. """ ns_list = self.cimrepository.namespaces assert isinstance(ns_list, NocaseList) return ns_list @property def interop_namespace_names(self): """ :term:`NocaseList` of :term:`string`: The valid Interop namespace names. Only these names may be the Interop namespace and only one Interop namespace may exist in a WBEM server environment. This list is defined in :attr:`pywbem.WBEMServer.INTEROP_NAMESPACES`. """ ns_list = self._interop_namespace_names assert isinstance(ns_list, NocaseList) return ns_list ################################################################ # # Methods to manage namespaces # ################################################################
[docs] def validate_namespace(self, namespace): """ Validates that a namespace is defined in the CIM repository. Returns only if namespace is valid. Otherwise it generates an exception. Parameters: namespace (:term:`string`): The name of the CIM namespace in the CIM repository (case insensitive). Must not be `None`. Any leading or trailing slash characters are ignored. Raises: :exc:`~pywbem.CIMError`: (CIM_ERR_INVALID_NAMESPACE) Namespace does not exist. """ try: self.cimrepository.validate_namespace(namespace) except KeyError: raise CIMError( CIM_ERR_INVALID_NAMESPACE, _format("Namespace does not exist in CIM repository: {0!A}", namespace))
[docs] def add_namespace(self, namespace, verbose=False): """ Add a CIM namespace to the CIM repository. The namespace must not yet exist in the CIM repository and the repository can contain only one Interop namespace. An Interop namespace can be added with this method, if an Interop namespace does not yet exist in the CIM repository. Parameters: namespace (:term:`string`): The name of the CIM namespace in the CIM repository. Must not be `None`. Any leading or trailing slash characters are removed before the string is used to define the namespace name. verbose (:class:`py:bool`): Verbose mode: Print a message about the namespace creation. Raises: ValueError: Namespace argument must not be None. :exc:`~pywbem.CIMError`: CIM_ERR_ALREADY_EXISTS if the namespace already exists in the CIM repository. :exc:`~pywbem.CIMError`: CIM_ERR_ALREADY_EXISTS if the namespace is one of the possible Interop namespace names and an interop namespace already exists in the CIM repository. """ if namespace is None: raise ValueError("Namespace argument must not be None") namespace = namespace.strip('/') # Just for appearance in exc messages # Cannot add more than one of the possible Interop namespace names if self.is_interop_namespace(namespace): if self.find_interop_namespace(): raise CIMError( CIM_ERR_ALREADY_EXISTS, _format("An Interop namespace {0!A} already exists in the " "CIM repository. {1!A} cannot be added. ", self.find_interop_namespace(), namespace)) if verbose: print(f"Creating namespace {namespace} (in mock support)") try: self.cimrepository.add_namespace(namespace) except ValueError: raise CIMError( CIM_ERR_ALREADY_EXISTS, _format("Namespace {0!A} already exists in the CIM repository ", namespace))
[docs] def remove_namespace(self, namespace, verbose=False): """ Remove a CIM namespace from the CIM repository. The namespace must exist in the CIM repository and must be empty. The Interop namespace cannot be removed from the CIM repository. Parameters: namespace (:term:`string`): The name of the CIM namespace in the CIM repository (case insensitive). Must not be `None`. Leading or trailing slash characters are ignored. verbose (:class:`py:bool`): Verbose mode: Print a message about the namespace deletion. Raises: ValueError: Namespace argument must not be None. :exc:`~pywbem.CIMError`: CIM_ERR_NOT_FOUND if the namespace does not exist in the CIM repository. :exc:`~pywbem.CIMError`: CIM_ERR_INVALID_NAMESPACE if the Interop namespace was specified. :exc:`~pywbem.CIMError`: CIM_ERR_NAMESPACE_NOT_EMPTY if the namespace icontains objects. :exc:`~pywbem.CIMError`: CIM_ERR_NAMESPACE_NOT_EMPTY if attempting to delete the default connection namespace. This namespace cannot be deleted from the CIM repository """ if namespace is None: raise ValueError("Namespace argument must not be None") namespace = namespace.strip('/') if namespace not in self.cimrepository.namespaces: raise CIMError( CIM_ERR_NOT_FOUND, _format("Namespace {0!A} does not exist in the CIM repository ", namespace)) if self.is_interop_namespace(namespace): raise CIMError( CIM_ERR_INVALID_NAMESPACE, _format("The Interop namespace {0!A} cannot be removed from " "the CIM repository.", namespace)) if verbose: print(f"Deleting namespace {namespace} (in mock support)") try: self.cimrepository.remove_namespace(namespace) # KeyError cannot happen because existence was already verified except ValueError: raise CIMError( CIM_ERR_NAMESPACE_NOT_EMPTY, _format("Namespace {0!A} contains objects.", namespace))
[docs] def is_interop_namespace(self, namespace): # pylint: disable=no-self-use """ Tests if a namespace name is a valid Interop namespace name. This method does not access the CIM repository for this test; it merely compares the specified namespace name against the list of valid Interop namespace names returned by :attr:`interop_namespace_names`. Parameters: namespace (:term:`string`): The namespace name that is to be tested. Returns: :class:`py:bool`: Indicates whether the namespace name is a valid Interop namespace name. """ return namespace in self.interop_namespace_names
[docs] def find_interop_namespace(self): """ Find the Interop namespace name in the CIM repository, or return `None`. The Interop namespace is identified by comparing all namespace names in the CIM repository against the list of valid Interop namespace names returned by :attr:`interop_namespace_names`. Returns: :term:`string`: The name of the Interop namespace if one exists in the CIM repository or `None`. """ for ns_name in self.cimrepository.namespaces: if self.is_interop_namespace(ns_name): return ns_name return None
################################################################ # # Common Repository access methods used by MainProvider and # InstanceProviders # ################################################################ @staticmethod def _remove_qualifiers(obj): """ Remove all qualifiers from the input object where the object may be an CIMInstance or CIMClass. Removes qualifiers from the object and from properties, methods, and parameters This is used to process the IncludeQualifier parameter for CIMClass and CIMInstance objects. Parameters: obj(:class:`~pywbem.CIMClass` or :class:`~pywbem.Instance`) Object from which qualifiers are to be removed """ assert isinstance(obj, (CIMInstance, CIMClass)) obj.qualifiers = NocaseDict() for prop in[prop].qualifiers = NocaseDict() if isinstance(obj, CIMClass): for method in obj.methods: obj.methods[method].qualifiers = NocaseDict() for param in obj.methods[method].parameters: obj.methods[method].parameters[param].qualifiers = \ NocaseDict() @staticmethod def _remove_classorigin(obj): """ Remove all ClassOrigin attributes from the input object. The object may be a CIMInstance or CIMClass. Used to process the IncludeClassOrigin parameter of CIMInstance and CIMClass objects Parameters: obj(:class:`~pywbem.CIMClass` or :class:`~pywbem.Instance`) Object from which classorigin attribute is to be removed. """ assert isinstance(obj, (CIMInstance, CIMClass)) for prop in[prop].class_origin = None if isinstance(obj, CIMClass): for method in obj.methods: obj.methods[method].class_origin = None
[docs] def get_class(self, namespace, classname, local_only=None, include_qualifiers=None, include_classorigin=None, property_list=None): # pylint: disable=invalid-name """ Get class from CIM repository. Gets the class defined by classname from the CIM repository, creates a copy, expands the copied class to include superclass properties if not localonly, and filters the class based on propertylist and includeClassOrigin. This method executes all of the filter actions on the class that are defined for the GetClass operation and so returns a class that satisfies the behavior requirements of the GetClass client request operation defined in :term:`DSP0200` . (see: :meth:`pywbem.WBEMConnection.GetClass`) It also sets the propagated attribute. Parameters: classname (:term:`string`): Name of class to retrieve namespace (:term:`string`): The name of the CIM namespace in the CIM repository (case insensitive). Must not be `None`. Leading or trailing slash characters are ignored. local_only (:class:`py:bool`): If `True` or `None`, only properties and methods in this specific class are returned. `None` means not supplied by client and the normal server default is `True`. If `False`, properties and methods from the superclasses are included. include_qualifiers (:class:`py:bool`): If `True` or `None`, include qualifiers. `None` is the server default if the parameter is not provided by the client. If `False`, do not include qualifiers. include_classorigin (:class:`py:bool`): If `True`, return the class_origin attributes of properties and methods. If `False` or `None` (use server default), class_origin attributes of properties and methods are not returned property_list (list of :term:`string`): Properties to be included in returned class. If None, all properties are returned. If empty list, no properties are returned Returns: :class:`~pywbem.CIMClass`: Copy of the CIM class in the CIM repository if found. Includes superclass properties installed and filtered in accord with the local_only, etc. arguments. Raises: :exc:`~pywbem.CIMError`: (CIM_ERR_NOT_FOUND) if class not found in CIM repository. :exc:`~pywbem.CIMError`: (CIM_ERR_INVALID_NAMESPACE) if namespace does not exist. """ class_store = self.cimrepository.get_class_store(namespace) # Try to get the target class and create a copy for response try: klass = class_store.get(classname, copy=True) except KeyError: raise CIMError( CIM_ERR_NOT_FOUND, _format("Class {0!A} not found in namespace {1!A}.", classname, namespace)) # local_only server default is True so True or None remove properties if local_only is True or local_only is None: # Deleting items while iterating requires using list() on py3: for pname, prop in list( if prop.propagated: del[pname] for mname, meth in list(klass.methods.items()): if meth.propagated: del klass.methods[mname] self.filter_properties(klass, property_list) # Remove qualifiers if specified. Note that the server default # is to include_qualifiers if include_qualifiers is None if include_qualifiers is False: self._remove_qualifiers(klass) # class_origin default is False so None or False cause removal if not include_classorigin: self._remove_classorigin(klass) return klass
[docs] def class_exists(self, namespace, classname): """ Test if class defined by classname parameter exists in CIM repository defined by namespace parameter. Returns: bool: `True` if class exists and `False` if it does not exist. Raises: KeyError: Namespace does not exist in the CIM repository """ class_store = self.cimrepository.get_class_store(namespace) return class_store.object_exists(classname)
def _get_instances(self, classname, namespace): """ Get instances of classname from the CIMRepository. It does not get instances of subclasses. Parameters: classname (:term:`string`): Name of class for which instances will be retrieved. namespace (:term:`string`): Repository namespace to search. Returns: List of instances in repository that represent classname """ classnames = NocaseList(classname) instance_store = self.cimrepository.get_instance_store(namespace) insts = [self._get_bare_instance(inst.path, instance_store) for inst in instance_store.iter_values() if inst.path.classname in classnames] return insts
[docs] @staticmethod def filter_properties(obj, property_list): """ Remove properties from an instance or class that aren't in the property_list parameter. Parameters: obj (:class:`~pywbem.CIMClass` or :class:`~pywbem.CIMInstance`): The class or instance from which properties are to be filtered property_list (list of :term:`string`): List of properties which are to be included in the result. If None, remove nothing. If empty list, remove everything. else remove properties that are not in property_list. Duplicated names are allowed in the list and ignored. """ if property_list is not None: property_list = [p.lower() for p in property_list] # Deleting items while iterating requires using list() on py3: for pname in list( if pname.lower() not in property_list: del[pname]
@staticmethod def _get_bare_instance(instance_name, instance_store, copy=None): """ Find an instance in the CIM repository by `instance_name` and return that instance. the `copy` parameter controls whether the original instance in the CIM repository is returned or a copy. The only time the original should be returned is when the user is certain that the returned object WILL NOT be modified. DO not modify an instance returned with this method Parameters: instance_name (:class:`~pywbem.CIMInstanceName`): Instance path of the instance to find in the CIM repository instance_store (:class:`~pywbem_mock.BaseObjectStore`): Instance store of the CIM repository to search for the instance copy (bool): If True do copy of the instance and return the copy. Otherwise return the instance in the CIM repository Returns: :class:`~pywbem.CIMInstance`: the complete CIM instance or copy of the CIM instance is returned if it is found in the CIM repository or `None` if the instance defined by `instance_name` is not found in the CIM repository. No filtering of properties, qualifiers, etc. is executed on the instance. """ if instance_store.object_exists(instance_name): return instance_store.get(instance_name, copy=copy) return None
[docs] def is_subclass(self, klass, superclass, class_store): """ Return boolean indicating whether a class is a (direct or indirect) subclass of a superclass, or the same class, in the specified class store of the CIM repository (i.e. in the namespace of that class store). This test is done by starting at the class and walking its superclasses up to the root, so it is not as expensive as determining all subclasses of a given class. Parameters: klass (:term:`string`): Class name of the class. superclass (:term:`string`): Class name of the superclass. class_store (:class:`~pywbem_mock.BaseObjectStore`): Class store of the CIM repository to search for the classes. Returns: bool: Boolean indicating whether the class is a (direct or indirect) subclass of the superclass, or the same class. Raises: KeyError: Class or superclass does not exist in the class store. """ # This check is at the begin in order to catch the case where class and # superclass are the ame class and both do not exist: klass_cls = class_store.get(klass) if klass.lower() == superclass.lower(): # Found it return True next_klass = klass_cls.superclass if next_klass is None: # Exhausted, we are at the root class # Check superclass existence class_store.get(superclass) return False # Continue searching upwards return self.is_subclass(next_klass, superclass, class_store)