Source code for pywbem_mock._wbemconnection_mock

#
# (C) Copyright 2018 InovaDevelopment.com
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
# Author: Karl  Schopmeyer <inovadevelopment.com>
#

"""
Mock support for the WBEMConnection class to allow pywbem users to test the
pywbem client without requiring a running WBEM server.

For documentation, see mocksupport.rst.
"""

from __future__ import absolute_import, print_function

from copy import deepcopy
import uuid
import time
import sys
import locale
import traceback
import re
from xml.dom import minidom
from mock import Mock
import six


from pywbem import WBEMConnection, CIMClass, CIMClassName, \
    CIMInstance, CIMInstanceName, CIMQualifierDeclaration, \
    CIMParameter, cimtype, CIMError, \
    CIM_ERR_NOT_FOUND, CIM_ERR_METHOD_NOT_FOUND, CIM_ERR_FAILED, \
    CIM_ERR_INVALID_SUPERCLASS, CIM_ERR_INVALID_PARAMETER, \
    CIM_ERR_INVALID_CLASS, CIM_ERR_ALREADY_EXISTS, \
    CIM_ERR_INVALID_NAMESPACE, CIM_ERR_INVALID_ENUMERATION_CONTEXT, \
    CIM_ERR_NOT_SUPPORTED, CIM_ERR_QUERY_LANGUAGE_NOT_SUPPORTED, \
    DEFAULT_NAMESPACE, MOFCompiler, MOFWBEMConnection
from pywbem.cim_obj import NocaseDict


__all__ = ['FakedWBEMConnection', 'method_callback_interface']

# Fake Server default values for parameters that apply to repo and operations

# Default Max_Object_Count for Fake Server if not specified by request
_DEFAULT_MAX_OBJECT_COUNT = 100

# Maximum Open... timeout if not set by request
OPEN_MAX_TIMEOUT = 40

# per DSP0200, the default behavior for EnumerateInstance DeepInheritance
# if not set by server.  Default is True.
DEFAULT_DEEP_INHERITANCE = True

# allowed output formats for the repository display
OUTPUT_FORMATS = ['mof', 'xml', 'repr']


# TODO: ks Future We have not considered that iq and ico are deprecated in
# DSP0200 for get_instance, etc. We could  set up a default to ignore these
# parameters for the operations in which they are deprecated and we
# should/could ignore them. We need to document our behavior in relation to the
# spec.

# pylint: disable=invalid-name
STDOUT_ENCODING = getattr(sys.stdout, 'encoding', None)
if not STDOUT_ENCODING:
    STDOUT_ENCODING = locale.getpreferredencoding()
if not STDOUT_ENCODING:
    STDOUT_ENCODING = 'utf-8'
# pylint: enable=invalid-name


def _uprint(dest, text):
    """
    Write text to dest, adding a newline character.

    Text must be a unicode string and must not be None.

    If dest is a file path, the text is encoded to a UTF-8 Byte sequence and
    is appended to the file (opening and closing the file).

    If dest is None, the text is encoded to a codepage suitable for the current
    stdout and is written to stdout.
    """
    if dest is None:
        btext = text.encode(STDOUT_ENCODING, 'replace')
        print(btext)
    else:
        btext = text.encode('utf-8')
        with open(dest, 'a') as f:
            print(btext, file=f)


[docs]def method_callback_interface(conn, objectname, methodname, **params): # pylint: disable=unused-argument, invalid-name, line-too-long """ **Experimental:** *New in pywbem 0.12 as experimental.* Interface for user-provided callback functions for CIM method invocation on a faked connection. Parameters: conn (:class:`~pywbem_mock.FakedWBEMConnection`): Faked connection. This can be used to access the mock repository of the faked connection, via its operation methods (e.g. `GetClass`). objectname (:class:`~pywbem.CIMInstanceName` or :class:`~pywbem.CIMClassName`): The object path of the target object of the invoked method, as follows: * Instance-level call: The instance path of the target instance, as a :class:`~pywbem.CIMInstanceName` object which has its `namespace` property set to the target namespace of the invocation. Its `host` property will not be set. * Class-level call: The class path of the target class, as a :class:`~pywbem.CIMClassName` object which has its `namespace` property set to the target namespace of the invocation. Its `host` property will not be set. methodname (:term:`string`): The CIM method name that is being invoked. This is the method name for which the callback function was registered. This parameter allows a single callback function to be registered for multiple methods. params (:ref:`NocaseDict`): The input parameters for the method that were passed to the `InvokeMethod` operation. Each dictionary item represents a single input parameter for the CIM method and is a :class:`~pywbem.CIMParameter` object, regardless of how the input parameter was passed to the :meth:`~pywbem.WBEMConnection.InvokeMethod` method. The :class:`~pywbem.CIMParameter` object will have at least the following properties set: * name (:term:`string`): Parameter name (case independent). * type (:term:`string`): CIM data type of the parameter. * value (:term:`CIM data type`): Parameter value. Returns: tuple: The callback function must return a tuple consisting of: * return_value (:term:`CIM data type`): Return value for the CIM method. * out_params (:term:`py:iterable` of :class:`~pywbem.CIMParameter`): Each item represents a single output parameter of the CIM method. The :class:`~pywbem.CIMParameter` objects must have at least the following properties set: * name (:term:`string`): Parameter name (case independent). * type (:term:`string`): CIM data type of the parameter. * value (:term:`CIM data type`): Parameter value. Raises: : Since the callback function mocks a CIM method invocation, it should raise :exc:`~pywbem.CIMError` exceptions to indicate failures. Any other exceptions raised by the callback function will be mapped to :exc:`~pywbem.CIMError` exceptions with status CIM_ERR_FAILED and a message that contains information about the exception including a traceback. """ # noqa: E501 # pylint: enable=line-too-long raise NotImplementedError
def _pretty_xml(xml_string): """ Common function to produce pretty xml string from an input xml_string. This function is NOT intended to be used in major code paths since it uses the minidom to produce the prettified xml and that uses a lot of memory """ result_dom = minidom.parseString(xml_string) pretty_result = result_dom.toprettyxml(indent=' ') # remove extra empty lines return re.sub(r'>( *[\r\n]+)+( *)<', r'>\n\2<', pretty_result)
[docs]class FakedWBEMConnection(WBEMConnection): """ **Experimental:** *New in pywbem 0.12 as experimental.* A subclass of :class:`pywbem.WBEMConnection` that mocks the communication with a WBEM server by utilizing a local in-memory *mock repository* to generate responses in the same way the WBEM server would. For a description of the operation methods on this class, see :ref:`Faked WBEM operations`. Each :class:`~pywbem.FakedWBEMConnection` object has its own mock repository which contains multiple CIM namespaces, and each namespace may contain CIM qualifier types (declarations), CIM classes and CIM instances. This class provides only a subset of the init parameters of :class:`~pywbem.WBEMConnection` because it does not have a connection to a WBEM server. It uses a faked and fixed URL for the WBEM server (``http://FakedUrl``) as a means of identifying the connection by users. Logging of the faked operations is supported via the pywbem logging facility and can be controlled in the same way as for :class:`~pywbem.WBEMConnection`. For details, see :ref:`WBEM operation logging`. """ def __init__(self, default_namespace=DEFAULT_NAMESPACE, use_pull_operations=False, stats_enabled=False, response_delay=None, repo_lite=False): """ Parameters: default_namespace (:term:`string`): Default namespace. This parameter has the same characteristics as the same-named init parameter of :class:`~pywbem.WBEMConnection`. use_pull_operations (:class:`py:bool`): Flag to control whether pull or traditional operaitons are used in the iter operations. This parameter has the same characteristics as the same-named init parameter of :class:`~pywbem.WBEMConnection`. stats_enabled (:class:`py:bool`): Flag to enable operation statistics. This parameter has the same characteristics as the same-named init parameter of :class:`~pywbem.WBEMConnection`. response_delay (:term:`number`): Artifically created delay for each operation, in seconds. This must be a positive number. Delays less than a second or other fractional delays may be achieved with float numbers. `None` disables the delay. Note that the :attr:`~pywbem_mock.FakedWBEMConnection.response_delay` property can be used to set this delay subsequent to object creation. repo_lite (:class:`py:bool`): Flag to set the :ref:`operation mode <mock repository operation modes>` of the mock repository. If `True`, lite mode is set. If `False`, full mode is set. """ # Response delay in seconds. Any operation is delayed by this time. # Initialize before superclass init because otherwise logger may # fail with this attribute not found self._response_delay = response_delay super(FakedWBEMConnection, self).__init__( 'http://FakedUrl', default_namespace=default_namespace, use_pull_operations=use_pull_operations, stats_enabled=stats_enabled) # The CIM classes in the mock repository. # This is a dictionary of dictionaries where the top level key is the # CIM namespace name and the keys for each sub-dictionary in a # namespace are class names, and the values in each sub-dictionary are # the CIM classes in that namespace, represented as CIMClass objects. # The dictionaries are NocaseDict since namespaces should be case # insensitive. self.classes = NocaseDict() # The CIM qualifier types in the mock repository. # Same format as for classes above, except that the values in each # sub-dictionary are CIMQualifierDeclaration objects. self.qualifiers = NocaseDict() # The CIM instances in the mock repository. # Because instances do not have a name, the format is slightly # different: This is a dictionary of lists where the top level key is # the CIM namespace name and the value is a list of CIM instances in # that namespace, represented as CIMInstance objects. # TODO: ks. FUTURE maybe we should really have a subdict per class but # it is not important for initial release. self.instances = NocaseDict() self.methods = NocaseDict() self._repo_lite = repo_lite # Open Pull Contexts. The key for each context is an enumeration # context id. The data is the total list of instances/names to # be returned and the current position in the list. Any context in # this list is still open. self.enumeration_contexts = {} self._imethodcall = Mock(side_effect=self._mock_imethodcall) self._methodcall = Mock(side_effect=self._mock_methodcall) @property def response_delay(self): """ :term:`number`: Artifically created delay for each operation, in seconds. If `None`, there is no delay. This attribute is settable. For details, see the description of the same-named constructor parameter. """ return self._response_delay @response_delay.setter def response_delay(self, delay): """Setter method; for a description see the getter method.""" if isinstance(delay, (int, float)) and delay >= 0 or delay is None: self._response_delay = delay else: raise ValueError("Invalid value for response_delay: %r, must be " "a positive number" % delay) def __repr__(self): return '%s(response_delay=%s, WBEMConnection(%r))' % \ (self.__class__.__name__, self.response_delay, super(FakedWBEMConnection, self).__repr__()) ################################################################ # # Methods to insert data into mock repository # ################################################################
[docs] def compile_mof_file(self, mof_file, namespace=None, search_paths=None, verbose=None): """ Compile the MOF definitions in the specified file (and its included files) and add the resulting CIM objects to the specified namespace of the mock repository. If the CIM namespace does not exist, it is created. This method supports all MOF pragmas, and specifically the include pragma. If a CIM class or CIM qualifier type to be added already exists in the target namespace with the same name (comparing case insensitively), this method raises :exc:`~pywbem.CIMError`. If a CIM instance to be added already exists in the target namespace with the same keybinding values, this method raises :exc:`~pywbem.CIMError`. In all cases where this method raises an exception, the mock repository remains unchanged. Parameters: mof_file (:term:`string`): Path name of the file containing the MOF definitions to be compiled. namespace (:term:`string`): The name of the target CIM namespace in the mock repository. This namespace is also used for lookup of any existing or dependent CIM objects. If `None`, the default namespace of the connection is used. search_paths (:term:`py:iterable` of :term:`string`): An iterable of directory path names where MOF dependent files will be looked up. See the description of the `search_path` init parameter of the :class:`~pywbem.MOFCompiler` class for more information on MOF dependent files. verbose (:class:`py:bool`): Controls whether to issue more detailed compiler messages. Raises: IOError: MOF file not found. :exc:`~pywbem.MOFParseError`: Compile error in the MOF. :exc:`~pywbem.CIMError`: Failure related to the CIM objects in the mock repository. """ if namespace is None: namespace = self.default_namespace mofcomp = MOFCompiler(MOFWBEMConnection(), search_paths=search_paths, verbose=verbose) mof_repo = mofcomp.handle self._setup_mof_repo(mof_repo) mofcomp.compile_file(mof_file, namespace) self._merge_repos(mof_repo)
[docs] def compile_mof_string(self, mof_str, namespace=None, search_paths=None, verbose=None): """ Compile the MOF definitions in the specified string and add the resulting CIM objects to the specified namespace of the mock repository. If the CIM namespace does not exist, it is created. This method supports all MOF pragmas, and specifically the include pragma. If a CIM class or CIM qualifier type to be added already exists in the target namespace with the same name (comparing case insensitively), this method raises :exc:`~pywbem.CIMError`. If a CIM instance to be added already exists in the target namespace with the same keybinding values, this method raises :exc:`~pywbem.CIMError`. In all cases where this method raises an exception, the mock repository remains unchanged. Parameters: mof (:term:`string`): A string with the MOF definitions to be compiled. namespace (:term:`string`): The name of the target CIM namespace in the mock repository. This namespace is also used for lookup of any existing or dependent CIM objects. If `None`, the default namespace of the connection is used. search_paths (:term:`py:iterable` of :term:`string`): An iterable of directory path names where MOF dependent files will be looked up. See the description of the `search_path` init parameter of the :class:`~pywbem.MOFCompiler` class for more information on MOF dependent files. verbose (:class:`py:bool`): Controls whether to issue more detailed compiler messages. Raises: IOError: MOF file not found. :exc:`~pywbem.MOFParseError`: Compile error in the MOF. :exc:`~pywbem.CIMError`: Failure related to the CIM objects in the mock repository. """ if namespace is None: namespace = self.default_namespace # TODO:ks Future we might be able to use our own MOFWBEMRepository to # directly insert into our repository instead of copying them # after the compile. mofcomp = MOFCompiler(MOFWBEMConnection(), search_paths=search_paths, verbose=verbose) mof_repo = mofcomp.handle self._setup_mof_repo(mof_repo) mofcomp.compile_string(mof_str, namespace) self._merge_repos(mof_repo)
[docs] def add_cimobjects(self, objects, namespace=None): # pylint: disable=line-too-long """ Add CIM classes, instances and/or CIM qualifier types (declarations) to the specified CIM namespace of the mock repository. This method adds a copy of the objects presented so that the user may modify the objects without impacting the repository. If the CIM namespace does not exist, it is created. The method imposes very few limits on the objects added. It does require that the superclass exist for any class added and that instances added include a path component. If a CIM class or CIM qualifier type to be added already exists in the target namespace with the same name (comparing case insensitively), this method fails, and the mock repository remains unchanged. If a CIM instance to be added already exists in the target namespace with the same keybinding values, this method fails, and the mock repository remains unchanged. Parameters: objects (:class:`~pywbem.CIMClass` or :class:`~pywbem.CIMInstance` or :class:`~pywbem.CIMQualifierDeclaration`, or list of them): CIM object or objects to be added to the mock repository. The list may contain different kinds of CIM objects. namespace (:term:`string`): The name of the target CIM namespace in the mock repository. This namespace is also used for lookup of any existing or dependent CIM objects. If `None`, the default namespace of the connection is used. Raises: ValueError: Invalid input CIM object in `objects` parameter. TypeError: Invalid type in `objects` parameter. """ # noqa: E501 # pylint: enable=line-too-long if namespace is None: namespace = self.default_namespace if isinstance(objects, list): for obj in objects: self.add_cimobjects(obj, namespace=namespace) else: obj = objects if isinstance(obj, CIMClass): cc = deepcopy(obj) if cc.superclass: if not self._class_exists(cc.superclass, namespace): raise ValueError('Class %s defines superclass %s but ' 'the superclass does not exist in the ' 'repository.' % (cc.classname, cc.superclass)) try: # The following generates an exception for each new ns self.classes[namespace][cc.classname] = cc except KeyError: self.classes[namespace] = NocaseDict({cc.classname: cc}) elif isinstance(obj, CIMInstance): inst = deepcopy(obj) if inst.path is None: raise ValueError("Instances added must include a path. " "Instance %r does not include a path" % inst) if inst.path.namespace is None: inst.path.namespace = namespace if inst.path.host is not None: inst.path.host = None try: inst_repo = self._get_instance_repo(namespace) if self._find_instance(inst.path, inst_repo)[1] is not None: raise ValueError('The instance %s already exists in ' 'namespace %s' % (inst, namespace)) self.instances[namespace].append(inst) except CIMError as ce: if ce.status_code == CIM_ERR_INVALID_NAMESPACE: self.instances[namespace] = [inst] else: raise CIMError(CIM_ERR_FAILED, 'Internal failure of ' 'add_cimobject operation. Rcvd ' ' CIMError %s' % ce) elif isinstance(obj, CIMQualifierDeclaration): qual = deepcopy(obj) try: self.qualifiers[namespace][qual.name] = qual except KeyError: self.qualifiers[namespace] = NocaseDict({qual.name: qual}) else: assert False, 'Object to add_cimobjects. %s invalid type' \ % type(obj)
[docs] def add_method_callback(self, classname, methodname, method_callback, namespace=None,): """ Register a callback function for a CIM method that will be called when the CIM method is invoked via `InvokeMethod`. Parameters: classname (:term:`string`): The CIM class name for which the callback function is registered. The faked `InvokeMethod` implementation uses this information to look up the callback function from its parameters. For method invocations on a target instance, this must be the class name of the creation class of the target instance. For method invocations on a target class, this must be the class name of the target class. methodname (:term:`string`): The CIM method name for which the callback function is registered. The faked `InvokeMethod` implementation uses this information to look up the callback function from its parameters. method_callback (:func:`~pywbem_mock.method_callback_interface`): The callback function. namespace (:term:`string`): The CIM namespace for which the callback function is registered. If `None`, the callback function is registered for the default namespace of the connection. The faked `InvokeMethod` implementation uses this information to look up the callback function from its parameters. """ if namespace is None: namespace = self.default_namespace if namespace not in self.methods: self.methods[namespace] = NocaseDict() if classname not in self.methods[namespace]: self.methods[namespace][classname] = NocaseDict() if methodname in self.methods[namespace][classname]: raise ValueError("Duplicate method specification") self.methods[namespace][classname][methodname] = method_callback
[docs] def display_repository(self, namespaces=None, dest=None, summary=False, output_format='mof'): """ Display the namespaces and objects in the mock repository in one of multiple formats to a destination. Parameters: namespaces (:term:`string` or list of :term:`string`): Limits display output to the specified CIM namespace or namespaces. If `None`, all namespaces of the mock repository are displayed. dest (:term:`string`): File path of the output file. If `None`, the output is written to stdout. summary (:class:`py:bool`): Flag for summary mode. If `True`, only a summary count of CIM objects in the specified namespaces of the mock repository is produced. If `False`, both the summary count and the details of the CIM objects are produced. output_format (:term:`string`): Output format, one of: 'mof', 'xml', or 'repr'. """ if output_format == 'mof': cmt_begin = '# ' cmt_end = '' elif output_format == 'xml': cmt_begin = '<!-- ' cmt_end = ' ->' else: cmt_begin = '' cmt_end = '' if output_format not in OUTPUT_FORMATS: raise ValueError('Invalid output format definition %s. ' '%s are valid.' % (output_format, OUTPUT_FORMATS)) _uprint(dest, '%s========Mock Repo Display fmt=%s namespaces=%s ' '=========%s\n' % (cmt_begin, output_format, ('all' if namespaces is None else namespaces), cmt_end)) # get all namespaces repo_ns = [] if self.classes: repo_ns.extend([ns for ns in self.classes]) if self.instances: repo_ns.extend([ns for ns in self.instances]) if self.qualifiers: repo_ns.extend([ns for ns in self.qualifiers]) if self.methods: repo_ns.extend([ns for ns in self.methods]) repo_nss = set(repo_ns) if namespaces: if isinstance(namespaces, six.string_types): namespaces = [namespaces] repo_nss = repo_nss.intersection(set(namespaces)) repo_nss = sorted(repo_nss) for ns in repo_nss: _uprint(dest, '\n%sNAMESPACE %s%s\n' % (cmt_begin, ns, cmt_end)) self._display_objects('Qualifier Declarations', self.qualifiers, ns, cmt_begin, cmt_end, dest=dest, summary=summary, output_format=output_format) self._display_objects('Classes', self.classes, ns, cmt_begin, cmt_end, dest=dest, summary=summary, output_format=output_format) self._display_objects('Instances', self.instances, ns, cmt_begin, cmt_end, dest=dest, summary=summary, output_format=output_format) self._display_objects('Methods', self.methods, ns, cmt_begin, cmt_end, dest=dest, summary=summary, output_format=output_format) _uprint(dest, '============End Repository=================')
@staticmethod def _display_objects(obj_type, objects_repo, namespace, cmt_begin, cmt_end, dest=None, summary=None, output_format=None): """ Display a set of objects of obj_type from the dictionary defined by the parameter objects_dict. obj_type is a string that defines the type of object (instance, class, qualifier declaration). """ # TODO:ks FUTURE Consider sorting to perserve order of compile/add. if namespace in objects_repo: if obj_type == 'Methods': _uprint(dest, '%sNamespace %s: contains %s %s:%s\n' % (cmt_begin, namespace, len(objects_repo[namespace]), obj_type, cmt_end)) else: _uprint(dest, '%sNamespace %s: contains %s %s %s\n' % (cmt_begin, namespace, len(objects_repo[namespace]), obj_type, cmt_end)) if summary: return # instances are special because the inner struct is a list if obj_type == 'Instances': try: insts = objects_repo[namespace] except KeyError: return # TODO:ks Future: Possibly sort insts by path order. for inst in insts: if output_format == 'xml': _uprint(dest, '%s Path=%s %s\n%s' % (cmt_begin, inst.path.to_wbem_uri(), cmt_end, _pretty_xml(inst.tocimxmlstr()))) elif output_format == 'repr': _uprint(dest, 'Path:\n%r\nInst:\n%r\n' % (inst.path, inst)) else: _uprint(dest, '%s Path=%s %s\n%s' % (cmt_begin, inst.path.to_wbem_uri(), cmt_end, inst.tomof())) elif obj_type == 'Methods': try: methods = objects_repo[namespace] except KeyError: return for cln in methods: for method in methods[cln]: _uprint(dest, '%sClass: %s, method: %s, ' 'callback: %s %s' % (cmt_begin, cln, method, methods[cln][method].__name__, cmt_end)) else: # Covers QualifierDeclarations and Classes try: objs = objects_repo[namespace] except KeyError: return for key in sorted(objs): obj = objs[key] if output_format == 'xml': _uprint(dest, _pretty_xml(obj.tocimxmlstr())) elif output_format == 'repr': _uprint(dest, '%r' % obj) else: _uprint(dest, obj.tomof()) def _get_inst_repo(self, namespace=None): """ Test support method that returns instances from the repository with no processing. It uses the default namespace if input parameter for namespace is None """ if namespace is None: namespace = self.default_namespace return self.instances[namespace] def _setup_mof_repo(self, repo): """ Move our repo to the mofcompile repo to provide a basis for the compile. """ repo.classes = deepcopy(self.classes) repo.qualifiers = deepcopy(self.qualifiers) repo.instances = deepcopy(self.instances) def _merge_repos(self, repo): """ Move objects from the repo repository to the self repository. Since the setup copied all existing objects to the compile repo, this clears the repository and them copies all of them back. """ if repo.classes: self.classes.clear() for ns in repo.classes: for cl in six.itervalues(repo.classes[ns]): try: self.classes[ns][cl.classname] = \ repo.classes[ns][cl.classname].copy() except KeyError: self.classes[ns] = NocaseDict({cl.classname: cl}) if repo.instances: self.instances.clear() for ns, insts in six.iteritems(repo.instances): for inst in insts: if inst.path is None: # use GetClass to get all properties cc = self.GetClass(inst.classname, namespace=ns, LocalOnly=False, IncludeQualifiers=True, IncludeClassOrigin=True) inst.path = CIMInstanceName.from_instance(cc, inst, ns) try: self.instances[ns].append(inst) except KeyError: self.instances[ns] = [inst] if repo.qualifiers: self.qualifiers.clear() for ns in repo.qualifiers: for qual in six.itervalues(repo.qualifiers[ns]): try: self.qualifiers[ns][qual.name] = \ repo.qualifiers[ns][qual.name].copy() except KeyError: self.qualifiers[ns] = NocaseDict({qual.name: qual}) ########################################################## # # Functions Mocked. WBEMConnection only mocks the WBEMConnection # _imethodcall and _methodcall methods. This captures all calls # to the wbem server. # ########################################################## def _mock_imethodcall(self, methodname, namespace, response_params_rqd=None, **params): # pylint: disable=unused-argument """ Mocks the WBEMConnection._imethodcall() method. This mock calls methods within this class that fake the processing in a WBEM server (at the CIM Object level) for the varisous CIM/XML methods and return. Each function is named with the lower case method namd prepended with '_fake_'. """ method_name = '_fake_' + methodname.lower() method_name = getattr(self, method_name) result = method_name(namespace, **params) # sleep for defined number of seconds if self._response_delay: time.sleep(self._response_delay) return result def _mock_methodcall(self, methodname, localobject, Params=None, **params): # pylint: disable=invalid-name """ Mocks the WBEMConnection._methodcall() method. This calls the server execution function of extrinsic methods (InvokeMethod). """ result = self._fake_invokemethod(methodname, localobject, Params, **params) # Sleep for defined number of seconds if self._response_delay: time.sleep(self._response_delay) return result ##################################################################### # # Common methods that the Fake... WBEMConnection methods use to # to communicate with the Mock repository. These are generally # private methods. # ##################################################################### def _class_exists(self, classname, namespace): """ Test if class defined by classname parameter exists in repository defined by namespace parameter. Returns True if class exists and False if it does not exist. Exception if the namespace does not exist """ class_repo = self._get_class_repo(namespace) return True if classname in class_repo else False @staticmethod def _make_tuple(rtn_value): """ Make the return value into a tuple in accord with _imethodcall """ return [("IRETURNVALUE", {}, rtn_value)] @staticmethod def _remove_qualifiers(obj): """ Remove all qualifiers from the input objectwhere the object may be an CIMInstance or CIMClass. Removes qualifiers from the object and from properties, methods, and parameters This is used to process the IncludeQualifier parameter for classes and instances """ assert isinstance(obj, (CIMInstance, CIMClass)) obj.qualifiers = NocaseDict() for prop in obj.properties: obj.properties[prop].qualifiers = NocaseDict() if isinstance(obj, CIMClass): for method in obj.methods: obj.methods[method].qualifiers = NocaseDict() for param in obj.methods[method].parameters: obj.methods[method].parameters[param].qualifiers = \ NocaseDict() @staticmethod def _remove_classorigin(obj): """ Remove all ClassOrigin attributes from the input object. The object may be a CIMInstance or CIMClass. Used to process the IncludeClassOrigin parameter of requests """ assert isinstance(obj, (CIMInstance, CIMClass)) for prop in obj.properties: obj.properties[prop].class_origin = None if isinstance(obj, CIMClass): for method in obj.methods: obj.methods[method].class_origin = None @staticmethod def _validate_repo(namespace, repo_dict, repo_type): """ Common method to validate existence of namespace for defined repo_dict. Returns the dictionary for this namespace if valid """ if namespace not in repo_dict: raise CIMError(CIM_ERR_INVALID_NAMESPACE, 'Namespace %s not found for %s' % (namespace, repo_type)) return repo_dict[namespace] def _get_class_repo(self, namespace): """ Validates that the class repository for the input namespaces exists and if it does, returns the handle to that repository. If the repo for namespace does not exist, it generates a CIM_Error The class repository is a NocaseDict with class as key and the CIMClass as value. Parameters: namespace(:term:`string`): String containing the name of the namespace to get Returns: Dictionary containing classes that have been inserted into the repository Raises: CIM_Error, CIM_ERR_INVALID_NAMESPACE if this namespace does not exist in the classrepository """ return self._validate_repo(namespace, self.classes, 'classes') def _get_instance_repo(self, namespace): """ Validates that the instance repository for the input namespaces exists and if it does, returns the handle to that repository. If the repo for namespace does not exist, it generates a CIM_Error The instance repository is a list if instances within the defined namespace Parameters: namespace(:term:`string`): String containing the name of the namespace to get Returns: List of instances Raises: CIM_Error, CIM_ERR_INVALID_NAMESPACE if this namespace does not exist in the classrepository """ if namespace not in self.instances: # create empty instance repo if there is a class repo for the # namespace. Existence of the class repo should imply existence # of instance repo if self._get_class_repo(namespace): self.instances[namespace] = [] if namespace not in self.methods: self.methods[namespace] = NocaseDict() return self._validate_repo(namespace, self.instances, 'instances') def _get_qualifier_repo(self, namespace): """ Validates that the qualifier repository for the input namespaces exists and if it does, returns the handle to that repository. If the repo for namespace does not exist, it generates a CIM_Error The instance repository is a list if instances within the defined namespace Parameters: namespace(:term:`string`): String containing the name of the namespace to get Returns: Dictionary of QualifierDeclaration objects in the repo Raises: CIM_Error: CIM_ERR_INVALID_NAMESPACE if this namespace does not exist in the classrepository """ return self._validate_repo(namespace, self.qualifiers, 'qualifiers') def _get_methods_repo(self, namespace=None): """ Validate that the table of methods exists for this namespace and if it does, return the handle to the dictionary that represents the methods defined for this namespce. If that repo does not exist, generate a CIM_Error. """ return self._validate_repo(namespace, self.methods, 'methods') def _get_superclassnames(self, cn, namespace): """ Get list of superclasses names from the class repository for the defined classname in the namespace. Returns in order of descending class hiearchy. """ class_repo = self._get_class_repo(namespace) superclass_names = [] if cn is not None: cnwork = cn while cnwork: cnsuper = class_repo[cnwork].superclass if cnsuper: superclass_names.append(cnsuper) cnwork = cnsuper superclass_names.reverse() return superclass_names def _get_subclass_names(self, classname, namespace, deep_inheritance): """ Get class names that are subclasses of the classname input parameter from the repository. If DeepInheritance is False, get only classes in the repository for the defined namespace for which this class is a direct super class. If deep_inheritance is True, get all direct and indirect subclasses. If false, get only a the next level of the hiearchy. Returns: list of strings defining the subclass names. """ if isinstance(classname, CIMClassName): classname = classname.classname # retrieve first level of subclasses for which classname is superclass if classname is None: rtn_classnames = [ cl.classname for cl in six.itervalues(self.classes[namespace]) if cl.superclass is None] else: rtn_classnames = [ cl.classname for cl in six.itervalues(self.classes[namespace]) if cl.superclass and cl.superclass.lower() == classname.lower()] # recurse for futher levels of class hiearchy if deep_inheritance: subclass_names = [] if rtn_classnames: for cn in rtn_classnames: subclass_names.extend( self._get_subclass_names(cn, namespace, deep_inheritance)) rtn_classnames.extend(subclass_names) return rtn_classnames def _get_class(self, classname, namespace, local_only=None, include_qualifiers=None, include_classorigin=None, property_list=None): # pylint: disable=invalid-name """ Get class from repository. Gets the class defined by classname from the repository, creates a copy, expands the copied class to include superclass properties if not localonly, and filters the class based on propertylist and includeClassOrigin. It also sets the propagated attribute. Parameters: classname (:term:`string`): Name of class to retrieve namespace (:term:`string`): Namespace from which to retrieve the class local_only (:class:`py:bool`): If True, only properties and methods in this specific class are returned. Otherwise properties and methods from the superclasses are included. include_qualifiers (:class:`py:bool`): If True, include qualifiers. Otherwise remove all qualifiers include_classorigin (:class:`py:bool`): If True return the class_origin attributes of properties and methods. property_list (): Properties to be included in returned class. If None, all properties are returned. If empty, no properties are returned Returns: Copy of the class if found with superclass properties installed and filtered per the keywords in params. Raises: CIMError: (CIM_ERR_NOT_FOUND) if class Not found in repository or CIMError: (CIM_ERR_INVALID_NAMESPACE) if namespace does not exist """ classes_repo = self._get_class_repo(namespace) # try to get the target class and create a copy for response try: cc = deepcopy(classes_repo[classname]) # local properties and methods are marked not propagated. for prop in cc.properties.values(): prop.propagated = False for method in cc.properties.values(): method.propagated = False except KeyError: raise CIMError(CIM_ERR_NOT_FOUND, 'Class %s not found in namespace ' '%s.' % (classname, namespace)) if not local_only and cc.superclass: sc_name = cc.superclass super_class = None while sc_name: try: super_class = classes_repo[sc_name] except KeyError: cx = cc if super_class is None else super_class raise CIMError(CIM_ERR_INVALID_SUPERCLASS, 'Class %s has invalid superclass %s.' % (cx.classname, sc_name)) for prop in super_class.properties.values(): if prop.name not in cc.properties: cc.properties[prop.name] = deepcopy(prop) cc.properties[prop.name].propagated = True for meth in super_class.methods.values(): if meth.name not in cc.methods: cc.methods[meth.name] = deepcopy(meth) cc.methods[meth.name].propagated = True sc_name = super_class.superclass self._filter_properties(cc, property_list) if not include_qualifiers: self._remove_qualifiers(cc) if not include_classorigin: self._remove_classorigin(cc) return cc def _get_association_classes(self, namespace): """ Return list of associator classes from the class repo Returns the classes that have associations qualifier. Does NOT copy so these are what is in repository. User functions MUST NOT modify these classes. Returns: Returns generator where each yield returns a singe association class """ class_repo = self._get_class_repo(namespace) # associator_classes = [] for cl in six.itervalues(class_repo): if 'Association' in cl.qualifiers: yield cl return @staticmethod def _find_instance(iname, inst_repo): """ Find an instance in the instance repo by iname and return the index of that instance. Parameters: iname: CIMInstancename to find inst_repo: the instance repo to search Return (None, None if not found. Otherwise return tuple of index, instance Raises: CIMError: Failed if repo invalid. """ rtn_inst = None rtn_index = None for index, inst in enumerate(inst_repo): if iname == inst.path: if rtn_inst is not None: # TODO:ks Future Remove dup test since we should be # insuring no dups on instance creation raise CIMError(CIM_ERR_FAILED, 'Invalid Repository. ' 'Multiple instances with same path %s' % rtn_inst.path) rtn_inst = inst rtn_index = index return(rtn_index, rtn_inst) def _get_instance(self, iname, namespace, property_list, local_only, include_class_origin, include_qualifiers): """ Local method implements getinstance. This is generally used by other instance methods that need to get an instance from the repository. It attempts to get the instance, copies it, and filters it for input parameters like localonly, includequalifiers, and propertylist. Returns: CIMInstance copy from the repository with property_list filtered, and qualifers removed if include_qualifiers=False and class origin removed if include_class_origin False """ inst_repo = self._get_instance_repo(namespace) rtn_tup = self._find_instance(iname, inst_repo) inst = rtn_tup[1] if inst is None: raise CIMError(CIM_ERR_NOT_FOUND, 'Instance not found in repository namespace %s. ' 'Path=%s' % (namespace, iname)) rtn_inst = deepcopy(inst) # If local_only remove properties where class_origin # differs from class of target instance if local_only: for p in rtn_inst: class_origin = rtn_inst.properties[p].class_origin if class_origin and class_origin != inst.classname: del rtn_inst[p] # if not repo_lite test against class properties if not self._repo_lite and local_only: # gets class propertylist which may be local only or all # superclasses try: cl = self._get_class(iname.classname, namespace, local_only=local_only) except CIMError as ce: if ce.status_code == CIM_ERR_NOT_FOUND: raise CIMError(CIM_ERR_INVALID_CLASS, 'Class %s not found ' ' for instance %s in namespace %s.' % (iname.classname, iname, namespace)) class_pl = cl.properties.keys() for p in list(rtn_inst): if p not in class_pl: del rtn_inst[p] self._filter_properties(rtn_inst, property_list) if not include_qualifiers: self._remove_qualifiers(rtn_inst) if not include_class_origin: self._remove_classorigin(rtn_inst) return rtn_inst def _get_subclass_list_for_enums(self, classname, namespace): """ Get class list (i.e names of subclasses for classname for the enumerateinstance methods. If conn.lite returns only classname but no subclasses. Returns NocaseDict where only the keys are important, This allows case insensitive matches of the names with Python "for cln in clns". """ if self._repo_lite: return NocaseDict({classname: classname}) if not self._class_exists(classname, namespace): raise CIMError(CIM_ERR_INVALID_CLASS, 'Class %r not found ' 'in namespace %r.' % (classname, namespace)) if not self.classes: return NocaseDict() clnslist = self._get_subclass_names(classname, namespace, True) clnsdict = NocaseDict() for cln in clnslist: clnsdict[cln] = cln clnsdict[classname] = classname return clnsdict @staticmethod def _filter_properties(obj, property_list): """ Remove properties from an instance or class that aren't in the plist parameter obj(:class:`~pywbem.CIMClassName` or :class:`~pywbem.CIMClassName): The class or instance from which properties are to be filtered property_list(list of :term:`string`): List of properties which are to be included in the result. If None, remove nothing. If empty list, remove everything. else remove properties that are not in property_list. Duplicated names are allowed in the list and ignored. """ if property_list is not None: property_list = [p.lower() for p in property_list] for pname in obj.properties.keys(): if pname.lower() not in property_list: del obj.properties[pname] ##################################################################### # # Faked WBEMConnection operation methods. # All the methods are named _fake_<methodname> and # are responders that emulate the server response. # # This is all the WBEMConnection methods that communicate with # a WBEMServer. # ###################################################################### def _fake_enumerateclasses(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.EnumerateClasses`. Enumerate classes from class repository. If classname parameter exists, use it as the starting point for the hiearchy to get subclasses. """ self._get_class_repo(namespace) classname = params.get('ClassName', None) if classname: assert(isinstance(classname, CIMClassName)) if not self._class_exists(classname.classname, namespace): raise CIMError(CIM_ERR_INVALID_CLASS, 'The class %r defined by "ClassName" parameter ' 'does not exist in namespace %r' % (classname, namespace)) clns = self._get_subclass_names(classname, namespace, params['DeepInheritance']) # Note: _get_class will return NOT_FOUND if the class not in the # repo but it was just found by _get_subclass_names so that would # probably be some form of repo corruption. classes = [ self._get_class(cn, namespace, local_only=params['LocalOnly'], include_qualifiers=params['IncludeQualifiers'], include_classorigin=params['IncludeClassOrigin']) for cn in clns] return self._make_tuple(classes) def _fake_enumerateclassnames(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.EnumerateClassNames`. Enumerates the classnames of the classname in the 'classname' parameter or from the top of the tree if 'classname is None. Returns: return tuple including list of classnames Raises: CIMError: CIM_ERR_INVALID_NAMESPACE if invalid namespace, CIMError: CIM_ERR_NOT_FOUND if Classname not found """ clns = self._get_subclass_names(params.get('ClassName', None), namespace, params['DeepInheritance']) rtn_clns = [ CIMClassName(cn, namespace=namespace, host=self.host) for cn in clns] return self._make_tuple(rtn_clns) def _fake_getclass(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.GetClass Retrieve a CIM class from the local repository. For a description of the parameters, see :meth:`pywbem.WBEMConnection.GetClass`. """ self._get_class_repo(namespace) cname = params['ClassName'].classname cc = self._get_class(cname, namespace, local_only=params['LocalOnly'], include_qualifiers=params['IncludeQualifiers'], include_classorigin=params['IncludeClassOrigin'], property_list=params['PropertyList']) return self._make_tuple([cc]) @staticmethod def _test_qualifier_decl(qualifier, qual_repo, namespace): """ Test that qualifier is in repo and valid. For for conn_lite, ignore this test """ if qual_repo is None: return if qualifier.name not in qual_repo: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Qualifier ' ' declaration %s required by CreateClass not found ' ' in namespace %s' % (qualifier.name, namespace)) def _fake_createclass(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.CreateClass` Creates a new class in the repository. Nothing is returned. Emulates WBEMConnection.CreateClass(...)) If the class repository for this namespace does not exist, this method creates it. Classes that are in the repository contain only the properties in the new_class, not any properties from inheritated classes. The corresponding _get_class resolves any inherited properties to create a complete class with both local and inherited properties. Raises: CIMError: CIM_ERR_INVALID_SUPERCLASS if superclass specified bu does not exist CIMError: CIM_ERR_INVALID_PARAMETER if NewClass parameter not a class CIMError: CIM_ERR_ALREADY_EXISTS if class already exists """ new_class = params['NewClass'] if not isinstance(new_class, CIMClass): raise CIMError(CIM_ERR_INVALID_PARAMETER, 'NewClass not valid CIMClass. Rcvd type: %s' % type(new_class)) if namespace not in self.classes: self.classes[namespace] = NocaseDict({}) if namespace not in self.methods: self.methods[namespace] = NocaseDict() if new_class.classname in self.classes[namespace]: raise CIMError(CIM_ERR_ALREADY_EXISTS, 'Class %s already exists in namespace %s.' % (new_class.classname, namespace)) # If there is a superclass defined, test existence and test for # overridden properties new_class = deepcopy(new_class) if new_class.superclass: try: sc = self._get_class(new_class.superclass, # noqa: F841 namespace=namespace, local_only=False, include_qualifiers=True, include_classorigin=True) except CIMError as ce: if ce.status_code == CIM_ERR_NOT_FOUND: raise CIMError(CIM_ERR_INVALID_SUPERCLASS, 'Superclass %s ' ' for class % snot found in namespace %s' % (new_class.superclass, new_class.classname, namespace)) else: raise for pname, prop in six.iteritems(new_class.properties): if pname in sc.properties: # TODO: should we use prop name from qualifier in override? if 'Override' not in new_class.properties[pname].qualifiers: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Property %s duplicates property in %s ' ' without override' % (pname, sc.classname)) else: sp = sc.properties[pname] if sp.type != prop.type \ or sp.is_array != prop.is_array \ or sp.embedded_object != prop.embedded_object: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Invalid new_class property %s. ' 'Does not match overridden ' 'property in class %s' % (pname, sc.classname)) for mname, method in six.iteritems(new_class.methods): if mname in sc.methods: if 'Override' not in new_class.method.qualifiers: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Method %s duplicates method in %s ' ' without override' % (mname, sc.classname)) else: super_meth = sc.method[mname] if super_meth.type != method.type \ or super_meth.is_array != method.is_array \ or super_meth.return_type != method.return_type: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Invalid new_class method %s. ' 'Does not match overridden ' 'method in class %s' % (mname, sc.classname)) # TODO: match qualifiers in override. # TODO: test class qualifiers. # Set class_origin and propagated in the new class and its elements. if self._repo_lite: qual_repo = None else: association_class = 'Association' in new_class.qualifiers qual_repo = self._get_qualifier_repo(namespace) for qual in six.itervalues(new_class.qualifiers): qual.propagated = False self._test_qualifier_decl(qual, qual_repo, namespace) for prop in six.itervalues(new_class.properties): if not association_class and prop.type == 'reference': raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Reference property %s not allowed on ' 'non-association class %s' % (prop.name, new_class.classname)) prop.class_origin = new_class.classname prop.propagated = False for qual in six.itervalues(prop.qualifiers): self._test_qualifier_decl(qual, qual_repo, namespace) qual.propagated = False for method in six.itervalues(new_class.methods): method.class_origin = new_class.classname method.propagated = False for qual in six.itervalues(method.qualifiers): self._test_qualifier_decl(qual, qual_repo, namespace) qual.propagated = False for param in six.itervalues(method.parameters): for qual in six.itervalues(param.qualifiers): self._test_qualifier_decl(qual, qual_repo, namespace) param.qualifiers.propagated = False self.classes[namespace][new_class.classname] = new_class def _fake_modifyclass(self, namespace, **params): """ Currently not implemented Implements a mock server responder for :meth:`~pywbem.WBEMConnection.MmodifyClass` Modifies a new class in the repository. Nothing is returned. Emulates WBEMConnection.CreateClass(...)) if the class repository for this namespace does not exist, this method creates it. Raises: CIMError: CIM_ERR_NOT_SUPPORTED """ print('ModifyClass not supported %s %s' % (namespace, params)) self._get_class_repo(namespace) raise CIMError(CIM_ERR_NOT_SUPPORTED, 'Currently ModifyClass not ' 'supported in ' 'Fake_WBEMConnection') def _fake_deleteclass(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.DeleteClass` Delete a class in the class repository if it exists. Emulates WBEMConnection.DeleteClass(...)) This is simplistic in that it ignores issues like existing subclasses and existence of instances. Nothing is returned. Raises: CIMError: CIM_ERR_NOT_FOUND if ClassName defines class not in repository """ class_repo = self._get_class_repo(namespace) cname = params['ClassName'].classname try: class_repo[cname] except KeyError: raise CIMError(CIM_ERR_NOT_FOUND, 'Class %s in namespace %s' 'not in repository. Not deleted.' % (cname, namespace)) classnames = self._get_subclass_names(cname, namespace, True) classnames.append(cname) # delete all instances and names in this class and subclasses for clname in classnames: if self.instances: inst_names = self.EnumerateInstanceNames(clname, namespace) for iname in inst_names: self.DeleteInstance(iname) del class_repo[clname] ########################################################## # # Faked Qualifier methods # ########################################################### def _fake_enumeratequalifiers(self, namespace, **params): # pylint: disable=unused-argument """ Imlements a mock server responder for :meth:`~pywbem.WBEMConnection.EnumerateQualifiers` Enumerates the qualifier declarations in the local repository of this namespace. """ qualifier_repo = self._get_qualifier_repo(namespace) qualifiers = list(qualifier_repo.values()) return self._make_tuple(qualifiers) def _fake_getqualifier(self, namespace, **params): """ Implements a server responder for :meth:`pywbem.WBEMConnection.GetQualifier`. Retrieves a qualifier declaration from the local repository of this namespace. Returns: Returns a tuple representing the _imethodcall return for this method where the data is a QualifierDeclaration Raises: CIMError: CIM_ERR_INVALID_NAMESPACE CIMError: CIM_ERR_NOT_FOUND """ qualifier_repo = self._get_qualifier_repo(namespace) qname = params['QualifierName'] try: qualifier = qualifier_repo[qname] return self._make_tuple([qualifier]) except KeyError: ce = CIMError(CIM_ERR_NOT_FOUND, 'Qualifier declaration %s not found in namespace %s.' % (qname, namespace)) raise ce def _fake_setqualifier(self, namespace, **params): """ Implements a server responder for :meth:`pywbem.WBEMConnection.SetQualifier`. Create or modify a qualifier type in the local repository of this class. This method will create a new namespace for the qualifier if none is defined. Raises: CIMError: CIM_ERR_INVALID_PARAMETER CIMError: CIM_ERR_ALREADY_EXISTS """ qual = params['QualifierDeclaration'] # TODO:ks FUTURE implement set... method for instance, qualifier, class # as general means to put new data into the repo. if namespace not in self.qualifiers: self.qualifiers[namespace] = NocaseDict({}) if not isinstance(qual, CIMQualifierDeclaration): raise CIMError(CIM_ERR_INVALID_PARAMETER, 'QualifierDeclaration parameter is not a ' 'valid CIMQualifierDeclaration. Rcvd type: %s' % type(qual)) if qual.name in self.qualifiers[namespace]: raise CIMError(CIM_ERR_ALREADY_EXISTS, 'Qualifier declaration %s already exists in ' 'namespace %s.' % (qual.name, namespace)) try: self.qualifiers[namespace][qual.name] = qual except KeyError: self.qualifiers[namespace] = NocaseDict({qual.name: qual}) def _fake_deletequalifier(self, namespace, **params): """ Implements a server responder for :meth:`~pywbem.WBEMConnection.DeleteQualifier` Deletes a single qualifier if it is in the repository for this class and namespace Raises; CIMError: CIM_ERR_INVALID_NAMESPACE, CIM_ERR_NOT_FOUND """ qualifier_repo = self._get_qualifier_repo(namespace) qname = params['QualifierName'] if qname in qualifier_repo: del qualifier_repo[qname] else: raise CIMError(CIM_ERR_NOT_FOUND, 'QualifierDeclaration %s not found in ' 'namespace %s.' % (qname, namespace)) ##################################################################### # # Faked WBEMConnection Instance methods # ##################################################################### def _fake_createinstance(self, namespace, **params): """ Implements a server responder for :meth:`~pywbem.WBEMConnection.CreateInstance` Create a CIM instance in the local repository of this class. Always use the namespace parameter assuming that pywbem.CreateInstance has captured any namespace in the instance path component. Raisess: CIMError: CIM_ERR_ALREADY_EXISTS CIMError: CIM_ERR_INVALID_CLASS """ new_instance = params['NewInstance'] if self._repo_lite: raise CIMError(CIM_ERR_NOT_SUPPORTED, 'CreateInstance not ' ' supported when repo_lite set.') if not isinstance(new_instance, CIMInstance): raise CIMError(CIM_ERR_INVALID_PARAMETER, 'NewInstance parameter is not a ' 'valid CIMInstance. Rcvd type: %s' % type(new_instance)) # Requires corresponding class to build path to be returned try: target_class = self._get_class(new_instance.classname, namespace, local_only=False, include_qualifiers=True, include_classorigin=True) except CIMError as ce: if ce.status_code == CIM_ERR_NOT_FOUND: raise CIMError(CIM_ERR_INVALID_CLASS, 'Cannot modify instance because its creation ' ' class %s does not exist in namespace %s.' % (new_instance.classname, namespace)) else: raise # Test all key properties in instance. This is our repository limit # since the repository cannot add values for key properties. We do # no allow creating key properties from class defaults. key_props = [p.name for p in six.itervalues(target_class.properties) if 'key' in p.qualifiers] for pn in key_props: if pn not in new_instance: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Key property %s not in NewInstance ' % pn) # If property not in instance, add it from class and use default value # from class for cprop_name in target_class.properties: if cprop_name not in new_instance: default_value = target_class.properties[cprop_name] new_instance[cprop_name] = default_value # Exception if property in instance but not class or types do not # match for ipname in new_instance: if ipname not in target_class.properties: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Property %s specified in NewInstance is not ' 'exposed by class %s in namespace %s' % (ipname, target_class.classname, namespace)) else: cprop = target_class.properties[ipname] iprop = new_instance.properties[ipname] if iprop.is_array != cprop.is_array or \ iprop.type != cprop.type: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Instance and class property %s types ' 'do not match: instance=%r, class=%r' % (ipname, iprop, cprop)) # Build instance path. We build the complete instance path new_instance.path = CIMInstanceName.from_instance( target_class, new_instance, namespace=namespace) try: # TODO:ks Future use internal function of repo to create namespace # for this repo. ex. _set_instance for inst in self.instances[namespace]: if inst.path == new_instance.path: raise CIMError(CIM_ERR_ALREADY_EXISTS, 'NewInstance already exists. %s in ' 'namespace %s.' % (new_instance.path, namespace)) self.instances[namespace].append(new_instance) except KeyError: self.instances[namespace] = [new_instance] if namespace not in self.methods: self.methods[namespace] = NocaseDict() # Create instance returns model path, path relative to namespace return self._make_tuple([deepcopy(new_instance.path)]) def _fake_modifyinstance(self, namespace, **params): """ Implements a server responder for :meth:`~pywbem.WBEMConnection.CreateInstance` Modify a CIM instance in the local repository. Raises: CIMError: CIM_ERR_ALREADY_EXISTS, CIM_ERR_INVALID_CLASS """ if self._repo_lite: raise CIMError(CIM_ERR_NOT_SUPPORTED, 'ModifyInstance not ' ' supported when repo_lite set.') inst_repo = self._get_instance_repo(namespace) modified_instance = deepcopy(params['ModifiedInstance']) property_list = params['PropertyList'] # Return if empty property list if property_list is not None and not property_list: return if modified_instance is not None and not modified_instance: return if not isinstance(modified_instance, CIMInstance): raise CIMError(CIM_ERR_INVALID_PARAMETER, 'The ModifiedInstance parameter is not a ' 'valid CIMInstance. Rcvd type: %s' % type(modified_instance)) # Classnames in instance and path must match if modified_instance.classname != modified_instance.path.classname: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'ModifyInstance classname in path and instance do ' 'not match. classname=%s, path.classname=%s' % (modified_instance.classname, modified_instance.path.classname)) # Get class including properties from superclasses try: target_class = self.GetClass(modified_instance.classname, namespace=namespace, LocalOnly=False, IncludeQualifiers=True, IncludeClassOrigin=True) except CIMError as ce: if ce.status_code == CIM_ERR_NOT_FOUND: raise CIMError(CIM_ERR_INVALID_CLASS, 'Cannot modify instance because its creation ' ' class %s does not exist in namespace %s.' % (modified_instance.classname, namespace)) else: raise # get key properties and all class props cl_props = [p.name for p in six.itervalues(target_class.properties)] key_props = [p.name for p in six.itervalues(target_class.properties) if 'key' in p.qualifiers] # Get original instance in repo. Does not copy the orig instance. mod_inst_path = modified_instance.path.copy() if modified_instance.path.namespace is None: mod_inst_path.namespace = namespace orig_instance_tup = self._find_instance(mod_inst_path, inst_repo) if orig_instance_tup[0] is None: raise CIMError(CIM_ERR_NOT_FOUND, 'Original Instance %s not found in namespace %s' % (modified_instance.path, namespace)) original_instance = orig_instance_tup[1] # Remove duplicate properties from property_list if property_list: if len(property_list) != len(set(property_list)): property_list = list(set(property_list)) # Test that all properties in modified instance and property list # are in the class if property_list: for p in property_list: if p not in cl_props: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Property %s in PropertyList not in class ' '%s' % (p, modified_instance.classname)) for p in modified_instance: if p not in cl_props: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Property %s in ModifiedInstance not in class ' ' %s' % (p, modified_instance.classname)) # Set the class value for properties in the property list but not # in the modified_instance. This sets just the value component. mod_inst_props = set(modified_instance.keys()) new_props = mod_inst_props.difference(set(cl_props)) if new_props: for new_prop in new_props: modified_instance[new_prop] = \ target_class.properties[new_prop].value # Remove all properties that do not change value between original # instance and modified instance for p in list(modified_instance): if original_instance[p] == modified_instance[p]: del modified_instance[p] # Confirm no key properties in remaining modified instance for p in key_props: if p in modified_instance: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'ModifyInstance cannot modify key property %s' % p) # Remove any properties from modified instance not in the property_list if property_list: for p in list(modified_instance): if p not in property_list: del modified_instance[p] # Exception if property in instance but not class or types do not # match for pname in modified_instance: if pname not in target_class.properties: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Property %s specified in ModifiedInstance is ' 'not exposed by class %s in namespace %s' % (pname, target_class.classname, namespace)) else: cprop = target_class.properties[pname] iprop = modified_instance.properties[pname] if iprop.is_array != cprop.is_array \ or iprop.type != cprop.type \ or iprop.array_size != cprop.array_size: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'Instance and class property name=%s type ' 'or other attributes do not match: ' 'instance=%r, class=%r' % (pname, iprop, cprop)) # Modify the value of properties in the repo with those from # modified instance index = orig_instance_tup[0] inst_repo[index].update(modified_instance.properties) return def _fake_getinstance(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.GetInstance`. Gets a single instance from the repository based on the InstanceName and filters it for PropertyList, etc. This method uses a common repository access method _get_instance to get, copy, and process the instance. Raises: CIMError: CIM_ERR_INVALID_NAMESPACE, CIM_ERR_INVALID_PARAMETER CIM_ERR_NOT_FOUND """ iname = params['InstanceName'] if iname.namespace is None: iname.namespace = namespace # If not repo lite, corresponding class must exist. if not self._repo_lite: if not self._class_exists(iname.classname, namespace): raise CIMError(CIM_ERR_INVALID_CLASS, 'Class %s for ' 'GetInstance of instance %s does not exist.' % (iname.classname, iname)) inst = self._get_instance(iname, namespace, params['PropertyList'], params['LocalOnly'], params['IncludeClassOrigin'], params['IncludeQualifiers']) return self._make_tuple([inst]) def _fake_deleteinstance(self, namespace, **params): """ Implements a mock server responder for :meth:`~pywbem.WBEMConnection.DeleteInstance`. This deletes a single instance from the mock repository based on the iname and namespace parameters. It does not attempt to delete referenceing instances (associations, etc. that reference this instance.) """ iname = params['InstanceName'] iname.namespace = namespace insts_repo = self._get_instance_repo(namespace) # if not repo_lite, Corresponding class must exist if not self._repo_lite: if not self._class_exists(iname.classname, namespace): raise CIMError(CIM_ERR_INVALID_CLASS, 'Class %s in namespace %s not found. ' ' Cannot delete instance %s' % (iname.classname, namespace, iname)) del_inst = None for enum_tup in enumerate(insts_repo): index = enum_tup[0] inst = enum_tup[1] if iname == inst.path: if del_inst is not None: raise CIMError(CIM_ERR_FAILED, 'Internal Error: Invalid ' ' Repository. Multiple instances with same ' ' path %s' % inst.path) # TODO:ks Future remove this test for duplicate inst paths since # we test for dups on insertion else: del insts_repo[index] del_inst = iname if not del_inst: raise CIMError(CIM_ERR_NOT_FOUND, 'Instance %s not found in ' 'repository namespace %s' % (iname, namespace)) def _fake_enumerateinstances(self, namespace, **params): """ Implements a server responder for :meth:`~pywbem.WBEMConnection.EnumerateInstances`. Gets a list of subclasses if the classes exist in the repository then executes getInstance for each to create the list of instances to be returned. Raises: CIMError: CIM_ERR_INVALID_NAMESPACE """ inst_repo = self._get_instance_repo(namespace) cname = params['ClassName'] assert isinstance(cname, CIMClassName) cname = cname.classname # If di False we use only properties from the original class. Modify # property list to limit properties to those from this class. This # only works if class exists. # if class not in repository, ignore di. di = params['DeepInheritance'] # If None, set to server default if di is None: di = DEFAULT_DEEP_INHERITANCE pl = params['PropertyList'] lo = params['LocalOnly'] if not self._repo_lite: # gets class propertylist which is may be localonly or all # superclasses cl = self._get_class(cname, namespace, local_only=lo) class_pl = cl.properties.keys() else: class_pl = None # if not lite repo and not di compute property list to filter # all instances to the properties in the target class as modified # by the PropertyList if not self._repo_lite: if not di: if pl is None: pl = class_pl else: # reduce pl to properties in class_properties pl = [pc for pc in class_pl if pc in pl] clns_dict = self._get_subclass_list_for_enums(cname, namespace) insts = [self._get_instance(inst.path, namespace, pl, None, # LocalOnly never gets passed params['IncludeClassOrigin'], params['IncludeQualifiers']) for inst in inst_repo if inst.path.classname in clns_dict] return self._make_tuple(insts) def _fake_enumerateinstancenames(self, namespace, **params): """ Implements a server responder for :meth:`~pywbem.WBEMConnection.EnumerateInstanceNames` Get instance names for instances that match the path define by `ClassName` and returns a list of the names. """ cname = params['ClassName'] assert isinstance(cname, CIMClassName) cname = cname.classname inst_repo = self._get_instance_repo(namespace) clns = self._get_subclass_list_for_enums(cname, namespace) inst_paths = [inst.path for inst in inst_repo if inst.path.classname in clns] rtn_paths = [deepcopy(path) for path in inst_paths] return self._make_tuple(rtn_paths) def _fake_execquery(self, namespace, **params): """ Implements a mock WBEM server responder for :meth:`~pywbem.WBEMConnection.ExecQuery` Executes the equilavent of the WBEMConnection ExecQuery for the querylanguage and query defined """ print('_fake_execquery ns %s, params %s' % (namespace, params)) self._get_instance_repo(namespace) raise CIMError(CIM_ERR_NOT_SUPPORTED, 'ExecQuery Not Implemented!') ##################################################################### # # Faked WBEMConnection Reference and Associator methods # ##################################################################### @staticmethod def _appendpath_unique(list_, path): """Append path to list if not already in list""" for p in list_: if p == path: return list_.append(path) def _return_assoc_tuple(self, objects): """ Create the property tuple for _imethod return of references, referencenames, associators, and associatornames methods. This is different than the get/enum imethod return tuples. It creates an OBJECTPATH for each object in the return list. _imethod call returns None when there are zero objects rather than a tuple with empty object path """ if objects: result = [(u'OBJECTPATH', {}, obj) for obj in objects] return self._make_tuple(result) return None def _return_assoc_class_tuples(self, rtn_classnames, namespace, iq, ico, pl): """ Creates the correct tuples of for associator and references class level responses from a list of classnames. This is special because the class level references and associators return a tuple of CIMClassName and CIMClass for every entry. """ rtn_tups = [] for cn in rtn_classnames: rtn_tups.append((CIMClassName(cn, namespace=namespace, host=self.host), self._get_class(cn, namespace=namespace, include_qualifiers=iq, include_classorigin=ico, property_list=pl))) return self._return_assoc_tuple(rtn_tups) def _classnamelist(self, classname, namespace): """Build a list of this class and its subclasses if classname is a string/CIMClassName or an empty list if classname is None. Differs from _subclass_names in that it includes classname """ if classname: cn = classname.classname if isinstance(classname, CIMClassName) \ else classname result = self._get_subclass_names(cn, namespace, True) result.append(classname) return result return [] def _classnamedict(self, classname, namespace): """Get from _classnamelist and cvt to NocaseDict""" clns = self._classnamelist(classname, namespace) rtn_dict = NocaseDict() for cln in clns: rtn_dict[cln] = cln return rtn_dict @staticmethod def _ref_prop_matches(prop, target_classname, ref_classname, resultclass_names, role): """ Test filters for a reference property Returns true if matches the criteria. Returns False if it does not match. The match criteria are: - target_classname == prop_reference_class - if result_classes are not None, ref_classname is in result_classes - If role is not None, prop name matches role """ assert prop.type == 'reference' if prop.reference_class.lower() == target_classname.lower(): if resultclass_names and ref_classname not in resultclass_names: return False if role and prop.name.lower() != role: return False return True return False @staticmethod def _assoc_prop_matches(prop, ref_classname, assoc_classes, result_classes, result_role): """ Test filters of a reference property and its associated entity Returns true if matches the criteria. Returns False if it does not match. Matches if ref_classname in assoc_classes, and result_role matches property name """ assert prop.type == 'reference' if assoc_classes and ref_classname not in assoc_classes: return False if result_classes and prop.reference_class not in result_classes: return False if result_role and prop.name.lower() != result_role: return False return True def _get_reference_classnames(self, classname, namespace, resultclass_name, role): """ Get list of classnames that are references for which this classname is a target filtered by the result_class and role parameters if they are none. This is a common method used by all of the other reference and associator methods to create a list of reference classnames Returns: list of classnames that satisfy the criteria. """ self._get_class_repo(namespace) result_classes = self._classnamedict(resultclass_name, namespace) rtn_classnames_set = set() role = role.lower() if role else role for cl in self._get_association_classes(namespace): for prop in six.itervalues(cl.properties): if prop.type == 'reference' and \ self._ref_prop_matches(prop, classname, cl.classname, result_classes, role): rtn_classnames_set.add(cl.classname) return list(rtn_classnames_set) def _get_reference_instnames(self, instname, namespace, resultclass_name, role): """ Get the reference instances from the repository for the target instname and filtered by the result_class and role parameters. Returns a list of the reference instance names. The returned list is the original, not a copy so the user must copy them """ insts_repo = self._get_instance_repo(namespace) if resultclass_name: # if there is a class repository get subclasses if self._get_class_repo(namespace): resultclass_dict = self._classnamedict(resultclass_name, namespace) else: resultclass_dict = NocaseDict(resultclass_name=resultclass_name) else: resultclass_dict = NocaseDict() instname.namespace = namespace rtn_instpaths = [] role = role.lower() if role else role # TODO:ks FUTURE: Make list from _get_reference_classnames if classes # exist. Otherwise set list to insts_repo to search every instance for inst in insts_repo: for prop in six.itervalues(inst.properties): if prop.type == 'reference': # does this prop instance name match target inst name if prop.value == instname: if resultclass_name: if inst.classname not in resultclass_dict: continue if role and prop.name.lower() != role: continue self._appendpath_unique(rtn_instpaths, inst.path) return rtn_instpaths def _get_associated_classnames(self, classname, namespace, assoc_class, result_class, result_role, role): """ Get list of classnames that are associated classes for which this classname is a target filtered by the assoc_class, role, result_class, and result_role parameters if they are none. This is a common method used by all of the other reference and associator methods to create a list of reference classnames Returns: list of classnames that satisfy the criteria. """ class_repo = self._get_class_repo(namespace) result_classes = self._classnamedict(result_class, namespace) assoc_classes = self._classnamedict(assoc_class, namespace) rtn_classnames_set = set() role = role.lower() if role else role result_role = result_role.lower() if result_role else result_role ref_clns = self._get_reference_classnames(classname, namespace, assoc_class, role) cls = [class_repo[cln] for cln in ref_clns] for cl in cls: for prop in six.itervalues(cl.properties): if prop.type == 'reference': if self._assoc_prop_matches(prop, cl.classname, assoc_classes, result_classes, result_role): rtn_classnames_set.add(prop.reference_class) return list(rtn_classnames_set) def _get_associated_instancenames(self, inst_name, namespace, assoc_class, result_class, result_role, role): """ Get the reference instances from the repository for the target instname and filtered by the result_class and role parameters. Returns a list of the reference instance names. The returned list is the original, not a copy so the user must copy them """ instance_repo = self._get_instance_repo(namespace) result_classes = self._classnamedict(result_class, namespace) assoc_classes = self._classnamedict(assoc_class, namespace) inst_name.namespace = namespace rtn_instpaths = [] role = role.lower() if role else role result_role = result_role.lower() if result_role else result_role ref_paths = self._get_reference_instnames(inst_name, namespace, assoc_class, role) # Get associated instance names for ref_path in ref_paths: inst = self._find_instance(ref_path, instance_repo)[1] for prop in six.itervalues(inst.properties): if prop.type == 'reference': if prop.value == inst_name: if assoc_class and inst.classname not in assoc_classes: continue if role and prop.name.lower() != role: continue else: if result_class and (prop.value.classname not in result_classes): continue if result_role and prop.name.lower() != result_role: continue self._appendpath_unique(rtn_instpaths, prop.value) return rtn_instpaths def _fake_referencenames(self, namespace, **params): """ Implements a mock WBEM server responder for :meth:`~pywbem.WBEMConnection.ReferenceNames` """ assert params['ResultClass'] is None or \ isinstance(params['ResultClass'], CIMClassName) rc = None if params['ResultClass'] is None else \ params['ResultClass'].classname role = params['Role'] obj_name = params['ObjectName'] classname = obj_name.classname if isinstance(obj_name, CIMClassName): ref_classnames = self._get_reference_classnames(classname, namespace, rc, role) ref_result = [CIMClassName(classname=cn, host=self.host, namespace=namespace) for cn in ref_classnames] return self._return_assoc_tuple(ref_result) assert isinstance(obj_name, CIMInstanceName) ref_paths = self._get_reference_instnames(obj_name, namespace, rc, role) rtn_names = [deepcopy(r) for r in ref_paths] for iname in rtn_names: if iname.host is None: iname.host = self.host return self._return_assoc_tuple(rtn_names) def _fake_references(self, namespace, **params): """ Implements a mock WBEM server responder for :meth:`~pywbem.WBEMConnection.References` """ rc = None if params['ResultClass'] is None else \ params['ResultClass'].classname role = params['Role'] obj_name = params['ObjectName'] classname = obj_name.classname pl = params['PropertyList'] ico = params['IncludeClassOrigin'] iq = params['IncludeQualifiers'] if isinstance(obj_name, CIMClassName): rtn_classnames = self._get_reference_classnames( classname, namespace, rc, role) # returns list of tuples of (CIMClassname, CIMClass) return self._return_assoc_class_tuples(rtn_classnames, namespace, iq, ico, pl) assert isinstance(obj_name, CIMInstanceName) ref_paths = self._get_reference_instnames(obj_name, namespace, rc, role) rtn_insts = [] for path in ref_paths: rtn_insts.append(self._get_instance( path, namespace, None, params['PropertyList'], params['IncludeClassOrigin'], params['IncludeQualifiers'])) for inst in rtn_insts: if inst.path.host is None: inst.path.host = self.host return self._return_assoc_tuple(rtn_insts) def _fake_associatornames(self, namespace, **params): # pylint: disable=invalid-name """ Implements a mock WBEM server responder for :meth:`~pywbem.WBEMConnection.AssociatorNames` """ self._get_instance_repo(namespace) rc = None if params['ResultClass'] is None else \ params['ResultClass'].classname ac = None if params['AssocClass'] is None else \ params['AssocClass'].classname role = params['Role'] result_role = params['ResultRole'] obj_name = params['ObjectName'] classname = obj_name.classname if isinstance(obj_name, CIMClassName): rtn_classnames = self._get_associated_classnames(classname, namespace, ac, rc, result_role, role) assoc_result = [CIMClassName(classname=cn, host=self.host, namespace=namespace) for cn in rtn_classnames] return self._return_assoc_tuple(assoc_result) assert isinstance(obj_name, CIMInstanceName) rtn_paths = self._get_associated_instancenames(obj_name, namespace, ac, rc, result_role, role) results = [deepcopy(p) for p in rtn_paths] for iname in results: if iname.host is None: iname.host = self.host return self._return_assoc_tuple(results) def _fake_associators(self, namespace, **params): """ Implements a mock WBEM server responder for :meth:`~pywbem.WBEMConnection.Associators` """ self._get_instance_repo(namespace) rc = None if params['ResultClass'] is None else \ params['ResultClass'].classname ac = None if params['AssocClass'] is None else \ params['AssocClass'].classname role = params['Role'] result_role = params['ResultRole'] obj_name = params['ObjectName'] classname = obj_name.classname pl = params['PropertyList'] ico = params['IncludeClassOrigin'] iq = params['IncludeQualifiers'] if isinstance(obj_name, CIMClassName): rtn_classnames = self._get_associated_classnames(classname, namespace, ac, rc, result_role, role) # returns list of tuples of (CIMClassname, CIMClass) return self._return_assoc_class_tuples(rtn_classnames, namespace, iq, ico, pl) assert isinstance(obj_name, CIMInstanceName) assoc_names = self._get_associated_instancenames(obj_name, namespace, ac, rc, result_role, role) results = [] for obj_name in assoc_names: results.append(self._get_instance( obj_name, namespace, None, params['PropertyList'], params['IncludeClassOrigin'], params['IncludeQualifiers'])) return self._return_assoc_tuple(results) ##################################################################### # # Faked WBEMConnection Open and Pull Instances Methods # # All of the following methods take the simplistic approach of getting # all of the data from the original functions and saving it # in the contexts dictionary. # We could improve performance by using an iterator to get the data # but are taking the simple approach since this is a mock tool. # ##################################################################### @staticmethod def _create_contextid(): """Return a new uuid for an enumeration context""" return str(uuid.uuid4()) @staticmethod def _make_pull_imethod_resp(objs, eos, context_id): """ Create the correct imethod response for the open and pull methods """ eos_tup = (u'EndOfSequence', None, eos) enum_ctxt_tup = (u'EnumerationContext', None, context_id) return [("IRETURNVALUE", {}, objs), enum_ctxt_tup, eos_tup] def _open_response(self, objects, namespace, pull_type, **params): """ Build an open... response once the objects have been extracted from the repository. """ max_obj_cnt = params['MaxObjectCount'] if max_obj_cnt is None: max_obj_cnt = _DEFAULT_MAX_OBJECT_COUNT default_server_timeout = 40 timeout = default_server_timeout if params['OperationTimeout'] is None \ else params['OperationTimeout'] if len(objects) <= max_obj_cnt: eos = u'TRUE' context_id = "" rtn_inst_names = objects else: eos = u'FALSE' context_id = self._create_contextid() # TODO:ks Future. Use the timeout along with response delay. Then # user could timeout pulls. This means adding timer test to # pulls and close. Timer should be used to close old contexts # also. self.enumeration_contexts[context_id] = {'pull_type': pull_type, 'data': objects, 'namespace': namespace, 'time': time.clock(), 'interoptimeout': timeout} rtn_inst_names = objects[0:max_obj_cnt] del objects[0: max_obj_cnt] return self._make_pull_imethod_resp(rtn_inst_names, eos, context_id) def _pull_response(self, namespace, req_type, **params): """ Common method for all of the Pull methods. Since all of the pull methods operate independent of the type of data, this single function severs as common code This method validates the namespace, gets data on the enumeration sequence from the enumeration_contexts table, validates the pull type, and returns the required number of objects. This method assumes the same context_id throughout the sequence. Raises: CIMError: CIM_ERR_INVALID_ENUMERATION_CONTEXT """ self._get_instance_repo(namespace) context_id = params['EnumerationContext'] try: context_data = self.enumeration_contexts[context_id] except KeyError: raise CIMError(CIM_ERR_INVALID_ENUMERATION_CONTEXT, 'EnumerationContext %s not found in mock server ' 'contexts.' % context_id) if context_data['pull_type'] != req_type: raise CIMError(CIM_ERR_INVALID_ENUMERATION_CONTEXT, 'Invalid pull operations %s does not match expected ' '%s for EnumerationContext %s' % (context_data['pull_type'], req_type, context_id)) objs_list = context_data['data'] max_obj_cnt = params['MaxObjectCount'] if not max_obj_cnt: max_obj_cnt = _DEFAULT_MAX_OBJECT_COUNT if len(objs_list) <= max_obj_cnt: eos = u'TRUE' rtn_objs_list = objs_list del self.enumeration_contexts[context_id] context_id = "" else: eos = u'FALSE' rtn_objs_list = objs_list[0: max_obj_cnt] del objs_list[0: max_obj_cnt] return self._make_pull_imethod_resp(rtn_objs_list, eos, context_id) @staticmethod def _validate_open_params(**params): """ Validate the fql parameters and if invalid, generate exception """ if not params['FilterQueryLanguage'] and params['FilterQuery']: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'FilterQuery without ' 'FilterQueryLanguage definition is invalid') if params['FilterQueryLanguage']: if params['FilterQueryLanguage'] != 'DMTF:FQL': raise CIMError(CIM_ERR_QUERY_LANGUAGE_NOT_SUPPORTED, 'FilterQueryLanguage %s not supported' % params['FilterQueryLanguage']) ot = params['OperationTimeout'] if ot: if not isinstance(ot, six.integer_types) or ot < 0 \ or ot > OPEN_MAX_TIMEOUT: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'OperationTimeout %s must be positive integer ' ' less than %s' % (ot, OPEN_MAX_TIMEOUT)) def _fake_openenumerateinstancepaths(self, namespace, **params): # pylint: disable=invalid-name """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenEnumerationInstancePaths` with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) result_t = self._fake_enumerateinstancenames(namespace, **params) return self._open_response(result_t[0][2], namespace, 'PullInstancePaths', **params) def _fake_openenumerateinstances(self, namespace, **params): """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenEnumerationInstances` with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) result_t = self._fake_enumerateinstances(namespace, **params) return self._open_response(result_t[0][2], namespace, 'PullInstancesWithPath', **params) def _fake_openreferenceinstancepaths(self, namespace, **params): # pylint: disable=invalid-name """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenReferenceInstancePaths` with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) params['ObjectName'] = params['InstanceName'] del params['InstanceName'] result = self._fake_referencenames(namespace, **params) objects = [] if result is None else [x[2] for x in result[0][2]] return self._open_response(objects, namespace, 'PullInstancePaths', **params) def _fake_openreferenceinstances(self, namespace, **params): """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenReferenceInstances` with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) params['ObjectName'] = params['InstanceName'] del params['InstanceName'] result = self._fake_references(namespace, **params) objects = [] if result is None else [x[2] for x in result[0][2]] return self._open_response(objects, namespace, 'PullInstancesWithPath', **params) def _fake_openassociatorinstancepaths(self, namespace, **params): # pylint: disable=invalid-name """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenAssociatorInstancePaths` with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) params['ObjectName'] = params['InstanceName'] del params['InstanceName'] result = self._fake_associatornames(namespace, **params) objects = [] if result is None else [x[2] for x in result[0][2]] return self._open_response(objects, namespace, 'PullInstancePaths', **params) def _fake_openassociatorinstances(self, namespace, **params): """ Implements WBEM server responder for WBEMConnection.OpenAssociatorInstances with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) params['ObjectName'] = params['InstanceName'] del params['InstanceName'] result = self._fake_associators(namespace, **params) objects = [] if result is None else [x[2] for x in result[0][2]] return self._open_response(objects, namespace, 'PullInstancesWithPath', **params) def _fake_openqueryinstances(self, namespace, **params): # pylint: disable=invalid-name """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenQueryInstances` with data from the instance repository. """ self._get_instance_repo(namespace) self._validate_open_params(**params) result = self._fake_execquery(namespace, **params) objects = [] if result is None else [x[2] for x in result[0][2]] return self._open_response(objects, namespace, 'PullInstancesWithPath', **params) def _fake_pullinstanceswithpath(self, namespace, **params): """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenPullInstancesWithPath` with data from the instance repository. """ return self._pull_response(namespace, 'PullInstancesWithPath', **params) def _fake_pullinstancepaths(self, namespace, **params): """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenPullInstancePaths` with data from the instance repository. """ return self._pull_response(namespace, 'PullInstancePaths', **params) def _fake_pullinstances(self, namespace, **params): """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.OpenPullInstances` with data from the instance repository. """ return self._pull_response(namespace, 'PullInstances', **params) def _fake_closeenumeration(self, namespace, **params): """ Implements WBEM server responder for :meth:`~pywbem.WBEMConnection.CloseEnumeration` with data from the instance repository. If the EnumerationContext is valid it removes it from the context repository. Otherwise it returns an exception. """ self._get_instance_repo(namespace) context_id = params['EnumerationContext'] try: context_data = self.enumeration_contexts[context_id] # This is probably relatively useless because pywbem handles # namespace internally but it could catch an error if user plays # with the context. if context_data['namespace'] != namespace: raise CIMError(CIM_ERR_INVALID_NAMESPACE, 'Incorrect Namespace %s for CloseEnumeration %s ' % (namespace, context_id)) except KeyError: raise CIMError(CIM_ERR_INVALID_ENUMERATION_CONTEXT, 'EnumerationContext %s not found in mock server ' 'EnumerationContexts. ' % context_id) del self.enumeration_contexts[context_id] ##################################################################### # # Faked WBEMConnection InvokeMethod # ##################################################################### def _fake_invokemethod(self, methodname, objectname, Params, **params): # pylint: disable=invalid-name """ Implements a mock WBEM server responder for :meth:`~pywbem.WBEMConnection.InvokeMethod` This responder calls a function defined by an entry in the methods repository. The return from that function is returned to the user. Input params are MethodName, ObjectName, and Params The return is espected to be the same as the return defined by WBEMConnection.InvokeMethod (ReturnValue, OutputParameters). """ if isinstance(objectname, (CIMInstanceName, CIMClassName)): localobject = deepcopy(objectname) if localobject.namespace is None: localobject.namespace = self.default_namespace localobject.host = None elif isinstance(objectname, six.string_types): # a string is always interpreted as a class name localobject = CIMClassName(objectname, namespace=self.default_namespace) else: raise TypeError('FakedWBEMConnection InvokeMethod invalid type for ' 'objectname: %s' % type(objectname)) namespace = localobject.namespace try: methodsrepo = self._get_methods_repo(namespace) except CIMError as ce: # If invoke indicates method repo not built, create it. if ce.status_code == CIM_ERR_INVALID_NAMESPACE: if namespace in self.classes or \ namespace in self.instances: raise CIMError(CIM_ERR_METHOD_NOT_FOUND, 'Method %s in namespace %s not registered ' 'in repository' % (methodname, namespace)) else: raise # Find the methods entry corresponding to classname. It must be in # the class defined by classname or one of its superclasses that # includes this method. Since the classes in the repo are not # resolved # This raises CIM_ERR_NOT_FOUND or CIM_ERR_INVALID_NAMESPACE # Uses local_only = False to get characteristics from super classes # and include_class_origin to get origin of method in hiearchy cc = self._get_class(localobject.classname, namespace, local_only=False, include_qualifiers=True, include_classorigin=True) # Determine if method defined in classname defined in # the classorigin of the method try: target_cln = cc.methods[methodname].class_origin except KeyError: raise CIMError(CIM_ERR_METHOD_NOT_FOUND, 'Method %s not found ' 'in class %s.' % (methodname, localobject.classname)) if target_cln != cc.classname: # TODO FUTURE: add method to repo that allows privileged users # direct access so we don't have to go through _get_class and can # test classes directly in repo tcc = self._get_class(target_cln, namespace, local_only=False, include_qualifiers=True, include_classorigin=True) if methodname not in tcc.methods: raise CIMError(CIM_ERR_METHOD_NOT_FOUND, 'Method %s not found ' 'in origin class %s derived from ' 'objectname class %s' % (methodname, target_cln, localobject.classname)) # Test for target class in methods repo try: methods = methodsrepo[target_cln] except KeyError: raise CIMError(CIM_ERR_METHOD_NOT_FOUND, 'Class %s for method %s in namespace %s not ' 'registered in methods repository' % (localobject.classname, methodname, namespace)) try: bound_method = methods[methodname] except KeyError: raise CIMError(CIM_ERR_METHOD_NOT_FOUND, 'Method %s in namespace %s not registered in ' ' methods repository. Internal error' % (methodname, namespace)) if bound_method is None: raise CIMError(CIM_ERR_METHOD_NOT_FOUND, 'Class %s for method %s in registered in methods ' 'repoository namespace %s' % (localobject.classname, methodname, namespace)) # Map the Params and **params into a single no-case dictionary # of CIMParameters params_dict = NocaseDict() if Params: for param in Params: if isinstance(param, CIMParameter): params_dict[param.name] = param elif isinstance(param, tuple): params_dict[param[0]] = CIMParameter(param[0], cimtype(param[1]), value=param[1]) else: raise CIMError(CIM_ERR_INVALID_PARAMETER, 'InvokeMethod Param %s invalid type, %s. ' 'Expected tuple or CIMParameter' % (param, type(param))) if params: for param in params: params_dict[param] = CIMParameter(param, cimtype(param[param]), value=param[param]) # Call the registered method and catch exceptions. try: result = bound_method(self, methodname, localobject, **params_dict) except CIMError: raise except Exception as ex: exc_type, exc_value, exc_traceback = sys.exc_info() tb = repr(traceback.format_exception(exc_type, exc_value, exc_traceback)) raise CIMError(CIM_ERR_FAILED, 'Exception failure of invoked ' 'method %s in namespace %s with ' 'input localobject %r, parameters ' '%r. Exception: %r\nTraceback\n%s' % (methodname, namespace, localobject, params, ex, tb)) # test for valid data in response. if not isinstance(result, (list, tuple)): raise CIMError(CIM_ERR_FAILED, 'Callback method returned %s ' 'response type. Expected list or tuple' % type(result)) for param in result[1]: if not isinstance(param, CIMParameter): raise CIMError(CIM_ERR_FAILED, 'Callback method returned ' '%s response type. Expected CIMParameter. ' % type(param)) # Map output params to NocaseDict to be compatible with return # from _methodcall. The input list is just CIMParameters output_params = NocaseDict() for param in result[1]: output_params[param.name] = param.value return (result[0], output_params)