Skip to content
Snippets Groups Projects
tests.py 17.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • Benjamin Jakimow's avatar
    Benjamin Jakimow committed
            # -*- coding: utf-8 -*-
    """
    /***************************************************************************
                                  EO Time Series Viewer
                                  -------------------
            begin                : 2015-08-20
            git sha              : $Format:%H$
            copyright            : (C) 2017 by HU-Berlin
            email                : benjamin.jakimow@geo.hu-berlin.de
     ***************************************************************************/
    
    /***************************************************************************
     *                                                                         *
     *   This program is free software; you can redistribute it and/or modify  *
     *   it under the terms of the GNU General Public License as published by  *
     *   the Free Software Foundation; either version 2 of the License, or     *
     *   (at your option) any later version.                                   *
     *                                                                         *
     ***************************************************************************/
    """
    # noinspection PyPep8Naming
    
    import os, re, io, importlib, uuid
    
    
    import qgis.testing
    
    
    Benjamin Jakimow's avatar
    Benjamin Jakimow committed
    from unittest import TestCase
    from timeseriesviewer import *
    from timeseriesviewer.timeseries import *
    from timeseriesviewer import DIR_EXAMPLES
    from timeseriesviewer.utils import file_search
    from osgeo import ogr, osr, gdal, gdal_array
    import example
    
    
    SHOW_GUI = True
    
    Benjamin Jakimow's avatar
    Benjamin Jakimow committed
    def testRasterFiles()->list:
        return list(file_search(os.path.dirname(example.__file__), '*.tif', recursive=True))
    
    
    def createTimeSeries(self) -> TimeSeries:
        files = testRasterFiles()
        TS = TimeSeries()
        self.assertIsInstance(TS, TimeSeries)
        TS.addSources(files)
        self.assertTrue(len(TS) > 0)
        return TS
    
    Benjamin Jakimow's avatar
    Benjamin Jakimow committed
    def initQgisApplication(*args, qgisResourceDir:str=None, **kwds)->QgsApplication:
    
        """
        Initializes a QGIS Environment
        :return: QgsApplication instance of local QGIS installation
        """
        if isinstance(QgsApplication.instance(), QgsApplication):
            return QgsApplication.instance()
        else:
    
            if not 'QGIS_PREFIX_PATH' in os.environ.keys():
                raise Exception('env variable QGIS_PREFIX_PATH not set')
    
    
            if sys.platform == 'darwin':
    
                assert '.app' in qgis.__file__, 'Can not locate path of QGIS.app'
    
                PATH_QGIS_APP = re.search(r'.*\.app', qgis.__file__).group()
    
                QApplication.addLibraryPath(os.path.join(PATH_QGIS_APP, *['Contents', 'PlugIns']))
                QApplication.addLibraryPath(os.path.join(PATH_QGIS_APP, *['Contents', 'PlugIns', 'qgis']))
    
            qgsApp = qgis.testing.start_app()
    
    
            if not isinstance(qgisResourceDir, str):
                parentDir = os.path.dirname(os.path.dirname(__file__))
                resourceDir = os.path.join(parentDir, 'qgisresources')
                if os.path.exists(resourceDir):
                    qgisResourceDir = resourceDir
    
            if isinstance(qgisResourceDir, str) and os.path.isdir(qgisResourceDir):
                modules = [m for m in os.listdir(qgisResourceDir) if re.search(r'[^_].*\.py', m)]
                modules = [m[0:-3] for m in modules]
                for m in modules:
                    mod = importlib.import_module('qgisresources.{}'.format(m))
                    if "qInitResources" in dir(mod):
                        mod.qInitResources()
    
    
            #initiate a PythonRunner instance if None exists
            if not QgsPythonRunner.isValid():
                r = PythonRunnerImpl()
                QgsPythonRunner.setInstance(r)
            return qgsApp
    
    
    
    class QgisMockup(QgisInterface):
        """
        A "fake" QGIS Desktop instance that should provide all the inferfaces a plugin developer might need (and nothing more)
        """
    
        def pluginManagerInterface(self)->QgsPluginManagerInterface:
            return self.mPluginManager
    
        @staticmethod
        def create()->QgisInterface:
            """
            Create the QgisMockup and sets the global variables
            :return: QgisInterface
            """
    
            iface = QgisMockup()
    
            import qgis.utils
            # import processing
            # p = processing.classFactory(iface)
            if not isinstance(qgis.utils.iface, QgisInterface):
    
                import processing
                qgis.utils.iface = iface
                processing.Processing.initialize()
    
                import pkgutil
                prefix = str(processing.__name__ + '.')
                for importer, modname, ispkg in pkgutil.walk_packages(processing.__path__, prefix=prefix):
                    try:
                        module = __import__(modname, fromlist="dummy")
                        if hasattr(module, 'iface'):
                            print(modname)
                            module.iface = iface
                    except:
                        pass
            #set 'home_plugin_path', which is required from the QGIS Plugin manager
            assert qgis.utils.iface == iface
            qgis.utils.home_plugin_path = os.path.join(QgsApplication.instance().qgisSettingsDirPath(), *['python', 'plugins'])
            return iface
    
        def __init__(self, *args):
            # QgisInterface.__init__(self)
            super(QgisMockup, self).__init__()
    
            self.mCanvas = QgsMapCanvas()
            self.mCanvas.blockSignals(False)
            self.mCanvas.setCanvasColor(Qt.black)
            self.mCanvas.extentsChanged.connect(self.testSlot)
            self.mLayerTreeView = QgsLayerTreeView()
            self.mRootNode = QgsLayerTree()
            self.mLayerTreeModel = QgsLayerTreeModel(self.mRootNode)
            self.mLayerTreeView.setModel(self.mLayerTreeModel)
            self.mLayerTreeMapCanvasBridge = QgsLayerTreeMapCanvasBridge(self.mRootNode, self.mCanvas)
            self.mLayerTreeMapCanvasBridge.setAutoSetupOnFirstLayer(True)
    
            import pyplugin_installer.installer
            PI = pyplugin_installer.instance()
            self.mPluginManager = QgsPluginManagerMockup()
    
            self.ui = QMainWindow()
    
    
    
            self.mMessageBar = QgsMessageBar()
            mainFrame = QFrame()
    
            self.ui.setCentralWidget(mainFrame)
            self.ui.setWindowTitle('QGIS Mockup')
    
    
            l = QHBoxLayout()
            l.addWidget(self.mLayerTreeView)
            l.addWidget(self.mCanvas)
            v = QVBoxLayout()
            v.addWidget(self.mMessageBar)
            v.addLayout(l)
            mainFrame.setLayout(v)
            self.ui.setCentralWidget(mainFrame)
            self.lyrs = []
            self.createActions()
    
        def iconSize(self, dockedToolbar=False):
            return QSize(30,30)
    
        def testSlot(self, *args):
            # print('--canvas changes--')
            s = ""
    
        def mainWindow(self):
            return self.ui
    
    
        def addToolBarIcon(self, action):
            assert isinstance(action, QAction)
    
        def removeToolBarIcon(self, action):
            assert isinstance(action, QAction)
    
    
        def addVectorLayer(self, path, basename=None, providerkey=None):
            if basename is None:
                basename = os.path.basename(path)
            if providerkey is None:
                bn, ext = os.path.splitext(basename)
    
                providerkey = 'ogr'
            l = QgsVectorLayer(path, basename, providerkey)
            assert l.isValid()
            QgsProject.instance().addMapLayer(l, True)
            self.mRootNode.addLayer(l)
            self.mLayerTreeMapCanvasBridge.setCanvasLayers()
            s = ""
    
        def legendInterface(self):
            return None
    
        def addRasterLayer(self, path, baseName=''):
            l = QgsRasterLayer(path, os.path.basename(path))
            self.lyrs.append(l)
            QgsProject.instance().addMapLayer(l, True)
            self.mRootNode.addLayer(l)
            self.mLayerTreeMapCanvasBridge.setCanvasLayers()
            return
    
            cnt = len(self.canvas.layers())
    
            self.canvas.setLayerSet([QgsMapCanvasLayer(l)])
            l.dataProvider()
            if cnt == 0:
                self.canvas.mapSettings().setDestinationCrs(l.crs())
                self.canvas.setExtent(l.extent())
    
                spatialExtent = SpatialExtent.fromMapLayer(l)
                # self.canvas.blockSignals(True)
                self.canvas.setDestinationCrs(spatialExtent.crs())
                self.canvas.setExtent(spatialExtent)
                # self.blockSignals(False)
                self.canvas.refresh()
    
            self.canvas.refresh()
    
        def createActions(self):
            m = self.ui.menuBar().addAction('Add Vector')
            m = self.ui.menuBar().addAction('Add Raster')
    
        def mapCanvas(self):
            return self.mCanvas
    
        def mapNavToolToolBar(self):
            super().mapNavToolToolBar()
    
        def messageBar(self, *args, **kwargs):
            return self.mMessageBar
    
        def rasterMenu(self):
            super().rasterMenu()
    
        def vectorMenu(self):
            super().vectorMenu()
    
        def viewMenu(self):
            super().viewMenu()
    
        def windowMenu(self):
            super().windowMenu()
    
        def zoomFull(self, *args, **kwargs):
            super().zoomFull(*args, **kwargs)
    
    
    
    class TestObjects():
        """
        Class with static routines to create test objects
        """
        @staticmethod
        def inMemoryImage(nl=10, ns=20, nb=1, crs='EPSG:32632', eType=gdal.GDT_Byte, nc:int=0, path:str=None):
            from timeseriesviewer.classification.classificationscheme import ClassificationScheme
    
            scheme = None
            if nc is None:
                nc = 0
            if nc > 0:
                eType = gdal.GDT_Byte if nc < 256 else gdal.GDT_Int16
                scheme = ClassificationScheme()
                scheme.createClasses(nc)
    
            drv = gdal.GetDriverByName('GTiff')
            assert isinstance(drv, gdal.Driver)
    
            if not isinstance(path, str):
                if nc > 0:
                    path = '/vsimem/testClassification.{}.tif'.format(str(uuid.uuid4()))
                else:
                    path = '/vsimem/testImage.{}.tif'.format(str(uuid.uuid4()))
    
            ds = drv.Create(path, ns, nl, bands=nb, eType=eType)
            assert isinstance(ds, gdal.Dataset)
            if isinstance(crs, str):
                c = QgsCoordinateReferenceSystem(crs)
                ds.SetProjection(c.toWkt())
            ds.SetGeoTransform([0,1.0,0, \
                                0,0,-1.0])
    
            assert isinstance(ds, gdal.Dataset)
            for b in range(1, nb + 1):
                band = ds.GetRasterBand(b)
    
                if isinstance(scheme, ClassificationScheme) and b == 1:
                    array = np.zeros((nl, ns), dtype=np.uint8) - 1
                    y0 = 0
    
    
                    step = int(np.ceil(float(nl) / len(scheme)))
    
                    for i, c in enumerate(scheme):
                        y1 = min(y0 + step, nl - 1)
                        array[y0:y1, :] = c.label()
                        y0 += y1 + 1
                    band.SetCategoryNames(scheme.classNames())
                    band.SetColorTable(scheme.gdalColorTable())
                else:
                    #create random data
                    array = np.random.random((nl, ns))
                    if eType == gdal.GDT_Byte:
                        array = array *256
                        array = array.astype(np.byte)
                    elif eType == gdal.GDT_Int16:
                        array = array * 2**16
                        array = array.astype(np.int16)
                    elif eType == gdal.GDT_Int32:
                        array = array * 2 ** 32
                        array = array.astype(np.int32)
    
                band.WriteArray(array)
            ds.FlushCache()
            return ds
    
        @staticmethod
        def createDropEvent(mimeData:QMimeData):
            """Creates a QDropEvent conaining the provided QMimeData"""
            return QDropEvent(QPointF(0, 0), Qt.CopyAction, mimeData, Qt.LeftButton, Qt.NoModifier)
    
    
        @staticmethod
        def processingAlgorithm():
    
            from qgis.core import QgsProcessingAlgorithm
    
            class TestProcessingAlgorithm(QgsProcessingAlgorithm):
    
                def __init__(self):
                    super(TestProcessingAlgorithm, self).__init__()
                    s = ""
    
                def createInstance(self):
                    return TestProcessingAlgorithm()
    
                def name(self):
                    return 'exmaplealg'
    
                def displayName(self):
                    return 'Example Algorithm'
    
                def groupId(self):
                    return 'exampleapp'
    
                def group(self):
                    return 'TEST APPS'
    
                def initAlgorithm(self, configuration=None):
                    self.addParameter(QgsProcessingParameterRasterLayer('pathInput', 'The Input Dataset'))
                    self.addParameter(
                        QgsProcessingParameterNumber('value', 'The value', QgsProcessingParameterNumber.Double, 1, False,
                                                     0.00, 999999.99))
                    self.addParameter(QgsProcessingParameterRasterDestination('pathOutput', 'The Output Dataset'))
    
                def processAlgorithm(self, parameters, context, feedback):
                    assert isinstance(parameters, dict)
                    assert isinstance(context, QgsProcessingContext)
                    assert isinstance(feedback, QgsProcessingFeedback)
    
    
                    outputs = {}
                    return outputs
    
            return TestProcessingAlgorithm()
    
    
    
    
    class QgsPluginManagerMockup(QgsPluginManagerInterface):
    
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
    
    
        def addPluginMetadata(self, *args, **kwargs):
            super().addPluginMetadata(*args, **kwargs)
    
        def addToRepositoryList(self, *args, **kwargs):
            super().addToRepositoryList(*args, **kwargs)
    
        def childEvent(self, *args, **kwargs):
            super().childEvent(*args, **kwargs)
    
        def clearPythonPluginMetadata(self, *args, **kwargs):
            #super().clearPythonPluginMetadata(*args, **kwargs)
            pass
    
        def clearRepositoryList(self, *args, **kwargs):
            super().clearRepositoryList(*args, **kwargs)
    
        def connectNotify(self, *args, **kwargs):
            super().connectNotify(*args, **kwargs)
    
        def customEvent(self, *args, **kwargs):
            super().customEvent(*args, **kwargs)
    
        def disconnectNotify(self, *args, **kwargs):
            super().disconnectNotify(*args, **kwargs)
    
        def isSignalConnected(self, *args, **kwargs):
            return super().isSignalConnected(*args, **kwargs)
    
        def pluginMetadata(self, *args, **kwargs):
            super().pluginMetadata(*args, **kwargs)
    
        def pushMessage(self, *args, **kwargs):
            super().pushMessage(*args, **kwargs)
    
        def receivers(self, *args, **kwargs):
            return super().receivers(*args, **kwargs)
    
        def reloadModel(self, *args, **kwargs):
            super().reloadModel(*args, **kwargs)
    
        def sender(self, *args, **kwargs):
            return super().sender(*args, **kwargs)
    
        def senderSignalIndex(self, *args, **kwargs):
            return super().senderSignalIndex(*args, **kwargs)
    
        def showPluginManager(self, *args, **kwargs):
            super().showPluginManager(*args, **kwargs)
    
        def timerEvent(self, *args, **kwargs):
            super().timerEvent(*args, **kwargs)
    
    
    class PythonRunnerImpl(QgsPythonRunner):
        """
        A Qgs PythonRunner implementation
        """
    
        def __init__(self):
            super(PythonRunnerImpl, self).__init__()
    
    
        def evalCommand(self, cmd:str, result:str):
            try:
                o = compile(cmd)
            except Exception as ex:
                result = str(ex)
                return False
            return True
    
        def runCommand(self, command, messageOnError=''):
            try:
                o = compile(command, 'fakemodule', 'exec')
                exec(o)
            except Exception as ex:
                messageOnError = str(ex)
                command = ['{}:{}'.format(i+1, l) for i,l in enumerate(command.splitlines())]
                print('\n'.join(command), file=sys.stderr)
                raise ex
                return False
            return True
    
    Benjamin Jakimow's avatar
    Benjamin Jakimow committed
    
    
    class TestObjects():
        """
        Creates objects to be used for testing. It is prefered to generate objects in-memory.
        """
    
        @staticmethod
        def createTimeSeries():
    
            TS = TimeSeries()
            files = file_search(DIR_EXAMPLES, '*.bsq', recursive=True)
            TS.addSources(files)
            return TS
    
        @staticmethod
        def createTimeSeriesStacks():
            vsiDir = '/vsimem/tmp'
            from timeseriesviewer.temporalprofiles2d import date2num
            ns = 50
            nl = 100
    
            r1 = np.arange('2000-01-01', '2005-06-14', step=np.timedelta64(16, 'D'), dtype=np.datetime64)
            r2 = np.arange('2000-01-01', '2005-06-14', step=np.timedelta64(8, 'D'), dtype=np.datetime64)
            drv = gdal.GetDriverByName('ENVI')
    
            crs = osr.SpatialReference()
            crs.ImportFromEPSG(32633)
    
            assert isinstance(drv, gdal.Driver)
            datasets = []
            for i, r in enumerate([r1, r2]):
                p = '{}tmpstack{}.bsq'.format(vsiDir, i + 1)
    
                ds = drv.Create(p, ns, nl, len(r), eType=gdal.GDT_Float32)
                assert isinstance(ds, gdal.Dataset)
    
                ds.SetProjection(crs.ExportToWkt())
    
                dateString = ','.join([str(d) for d in r])
                dateString = '{{{}}}'.format(dateString)
                ds.SetMetadataItem('wavelength', dateString, 'ENVI')
    
                for b, date in enumerate(r):
                    decimalYear = date2num(date)
    
                    band = ds.GetRasterBand(b + 1)
                    assert isinstance(band, gdal.Band)
                    band.Fill(decimalYear)
                ds.FlushCache()
                datasets.append(p)
    
            return datasets
    
        @staticmethod
        def spectralProfiles(n):
            """
            Returns n random spectral profiles from the test data
            :return: lost of (N,3) array of floats specifying point locations.
            """
    
            files = file_search(DIR_EXAMPLES, '*.tif', recursive=True)
            results = []
            import random
            for file in random.choices(files, k=n):
                ds = gdal.Open(file)
                assert isinstance(ds, gdal.Dataset)
                b1 = ds.GetRasterBand(1)
                noData = b1.GetNoDataValue()
                assert isinstance(b1, gdal.Band)
                x = None
                y = None
                while x is None:
                    x = random.randint(0, ds.RasterXSize-1)
                    y = random.randint(0, ds.RasterYSize-1)
    
                    if noData is not None:
                        v = b1.ReadAsArray(x,y,1,1)
                        if v == noData:
                            x = None
                profile = ds.ReadAsArray(x,y,1,1).flatten()
                results.append(profile)
    
            return results