Skip to content
Snippets Groups Projects
test_pixelloader.py 6.38 KiB
Newer Older
  • Learn to ignore specific revisions
  • # coding=utf-8
    """Safe Translations Test.
    
    .. note:: This program is free software; you can redistribute it and/or modify
         it under the terms of the GNU General Public License as published by
         the Free Software Foundation; either version 2 of the License, or
         (at your option) any later version.
    
    """
    
    from timeseriesviewer.pixelloader import *
    __author__ = 'benjamin.jakimow@geo.hu-berlin.de'
    
    import unittest
    import os, sys, pickle
    
    
    from timeseriesviewer.utils import initQgisApplication
    
    import example.Images
    
    Benjamin Jakimow's avatar
    Benjamin Jakimow committed
    from timeseriesviewer.utils import file_search
    
    QGIS_APP = initQgisApplication()
    
    def onDummy(*args):
        print(('dummy', args))
    
    
    
    class PixelLoaderTest(unittest.TestCase):
        """Test translations work."""
    
        @classmethod
        def setUpClass(cls):
    
    Benjamin Jakimow's avatar
    Benjamin Jakimow committed
            from timeseriesviewer import DIR_EXAMPLES
            from timeseriesviewer.utils import file_search
    
            cls.imgs = file_search(DIR_EXAMPLES, '*.tif', recursive=True)
            cls.img1 = cls.imgs[0]
            ds = gdal.Open(cls.img1)
            assert isinstance(ds, gdal.Dataset)
            nb, nl, ns = ds.RasterCount, ds.RasterYSize, ds.RasterXSize
            cls.img1Shape = (nb, nl, ns)
            cls.img1gt = ds.GetGeoTransform()
            cls.img1ProfileUL = ds.ReadAsArray(0, 0, 1, 1)
            cls.img1ProfileLR = ds.ReadAsArray(ns - 1, nl - 1, 1, 1)
            ds = None
    
        @classmethod
        def tearDownClass(cls):
            pass
    
        def setUp(self):
            """Runs before each test."""
            pass
        def tearDown(self):
            """Runs after each test."""
            pass
    
    
    
    
    
        def test_pixelLoader(self):
            from timeseriesviewer.pixelloader import doLoaderTask, PixelLoaderTask, INFO_OUT_OF_IMAGE, INFO_NO_DATA
            from timeseriesviewer.utils import px2geo
            source = example.Images.Img_2014_05_15_LE72270652014135CUB00_BOA
    
            ext = SpatialExtent.fromRasterSource(source)
            ds = gdal.Open(source)
    
            gt = ds.GetGeoTransform()
            wkt = ds.GetProjection()
            ds = None
            ptNoData = px2geo(QgsPointXY(66,2), gt)
    
            pt = SpatialPoint.fromSpatialExtent(ext)
            x,y = pt.x(), pt.y()
            ptOutOfImage = SpatialExtent(ext.crs(), x + 10000, y, x + 12000, y + 70)  # out of image
            ptNoData     = SpatialPoint(ext.crs(), ptNoData)
            ptValid1      = SpatialPoint(ext.crs(), x, y)
            ptValid2 = SpatialPoint(ext.crs(), x+50, y+50)
    
            #test a valid pixels
    
            try:
                result = doLoaderTask(PixelLoaderTask(source, [ptValid1, ptValid2]))
            except Exception as ex:
                self.fail('Failed to return the pixels for two geometries')
    
            self.assertIsInstance(result, PixelLoaderTask)
            self.assertTrue(result.success())
            self.assertEqual(result.sourcePath, source)
            self.assertSequenceEqual(result.bandIndices, [0,1,2,3,4,5])
            self.assertIs(result.exception, None)
    
    
    
            self.assertEqual(result.resCrsWkt, wkt)
            self.assertEqual(result.resGeoTransformation, gt)
            self.assertEqual(result.resNoDataValues, None)
            self.assertIsInstance(result.resProfiles, list)
            self.assertEqual(len(result.resProfiles), 2, msg='did not return results for two geometries.')
    
            for i in range(len(result.resProfiles)):
                dn, stdDev = result.resProfiles[i]
                self.assertIsInstance(dn, np.ndarray)
                self.assertIsInstance(stdDev, np.ndarray)
    
                self.assertEqual(len(dn), len(stdDev))
                self.assertEqual(len(dn), 6)
                self.assertEqual(stdDev.max(), 0) #std deviation of a single pixel is always zero
                self.assertEqual(stdDev.min(), 0)
    
    
    
            #test a out-of-image geometry
            result = doLoaderTask(PixelLoaderTask(source, ptOutOfImage))
            self.assertTrue(result.success())
            self.assertEqual(result.resProfiles[0], INFO_OUT_OF_IMAGE)
    
            result = doLoaderTask(PixelLoaderTask(source, ptNoData))
            self.assertTrue(result.success())
            self.assertEqual(result.resProfiles[0], INFO_NO_DATA)
    
    
        def test_loadProfiles(self):
    
            from timeseriesviewer.utils import SpatialPoint, SpatialExtent, px2geo
    
    
    
            img1 = self.img1
            nb, nl, ns = self.img1Shape
            gt = self.img1gt
    
            ext = SpatialExtent.fromRasterSource(img1)
    
            ptUL = SpatialPoint(ext.crs(), *ext.upperLeft())
            ptLR = px2geo(QPoint(ns-1, nl-1), gt)
            ptLR = SpatialPoint(ext.crs(), ptLR)
            ptOutOfImage = SpatialPoint(ext.crs(), px2geo(QPoint(-1,-1), gt))
    
    
            dir = os.path.dirname(example.Images.__file__)
            # files = file_search(dir, '*.tif')
            files = [example.Images.Img_2014_05_07_LC82270652014127LGN00_BOA]
            files.append(example.Images.Img_2014_04_29_LE72270652014119CUB00_BOA)
            files.extend(file_search(dir, 're_*.tif'))
            for f in files: print(f)
            ext = SpatialExtent.fromRasterSource(files[0])
            x, y = ext.center()
    
    
                # SpatialPoint(ext.crs(), 681151.214,-752388.476), #nodata in Img_2014_04_29_LE72270652014119CUB00_BOA
                SpatialExtent(ext.crs(), x + 10000, y, x + 12000, y + 70),  # out of image
                SpatialExtent(ext.crs(), x, y, x + 10000, y + 70),
    
                ]
    
            geoms2 = [SpatialPoint(ext.crs(), x, y),
                      SpatialPoint(ext.crs(), x + 250, y + 70)]
    
    
            from multiprocessing import Pool
    
            def onPxLoaded(*args):
                n, nmax, task = args
                assert isinstance(task, PixelLoaderTask)
                print(task)
    
            PL = PixelLoader()
    
    
            def onTimer(*args):
                print(('TIMER', PL))
                pass
    
            PL.sigPixelLoaded.connect(onPxLoaded)
            PL.sigLoadingFinished.connect(lambda: onDummy('finished'))
            PL.sigLoadingCanceled.connect(lambda: onDummy('canceled'))
            PL.sigLoadingStarted.connect(lambda: onDummy('started'))
            PL.sigPixelLoaded.connect(lambda: onDummy('px loaded'))
    
    
            tasks1 = []
            for i, f in enumerate(files):
                kwargs = {'myid': 'myID{}'.format(i)}
                tasks1.append(PixelLoaderTask(f, geoms1, bandIndices=None, **kwargs))
    
            tasks2 = []
    
            for i, f in enumerate(files):
                kwargs = {'myid': 'myID{}'.format(i)}
    
                tasks2.append(PixelLoaderTask(f, geoms2, bandIndices=None, **kwargs))
    
    
            for t in tasks1:
                result = doLoaderTask(t)
                s = ""
    
    
            PL.startLoading(tasks1)
            PL.startLoading(tasks2)
    
            print('DONE')
    
        unittest.main()