Newer
Older
# coding=utf-8
"""Safe Translations Test.
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
"""
from eotimeseriesviewer.pixelloader import *
__author__ = 'benjamin.jakimow@geo.hu-berlin.de'
import unittest
import os, sys, pickle
import qgis.testing
from eotimeseriesviewer.tests import initQgisApplication
from eotimeseriesviewer import *
from eotimeseriesviewer.tests import TestObjects, initQgisApplication
QGIS_APP = initQgisApplication()
SHOW_GUI = True and os.environ.get('CI') is None
def onDummy(*args):
print(('dummy', args))
def waitForEnd(pl:PixelLoader):
print('wait for end...')
while QgsApplication.taskManager().countActiveTasks() > 0:
QCoreApplication.processEvents()
QCoreApplication.processEvents()
print('done')
class PixelLoaderTest(unittest.TestCase):
"""Test translations work."""
@classmethod
def setUpClass(cls):
from eotimeseriesviewer import DIR_EXAMPLES
cls.imgs = file_search(DIR_EXAMPLES, '*.tif', recursive=True)
cls.img1 = list(cls.imgs)[0]
ds = gdal.Open(cls.img1)
assert isinstance(ds, gdal.Dataset)
nb, nl, ns = ds.RasterCount, ds.RasterYSize, ds.RasterXSize
cls.img1Shape = (nb, nl, ns)
cls.img1gt = ds.GetGeoTransform()
cls.img1ProfileUL = ds.ReadAsArray(0, 0, 1, 1)
cls.img1ProfileLR = ds.ReadAsArray(ns - 1, nl - 1, 1, 1)
ds = None
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
"""Runs before each test."""
pass
def tearDown(self):
"""Runs after each test."""
pass
def createTestImageSeries(self, n=10)->list:
return TestObjects.createTestImageSeries(n=n)
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
def test_QgsTaskManager(self):
nTasks = 0
result = None
def countTasks(*args):
nonlocal nTasks
nTasks += 1
tm = QgsApplication.taskManager()
self.assertIsInstance(tm, QgsTaskManager)
tm.taskAdded.connect(countTasks)
def func1(taskWrapper:QgsTaskWrapper):
return 'Hello'
def onFinished(e, value):
nonlocal result
assert e is None
result = value
task = QgsTask.fromFunction('',func1, on_finished = onFinished)
self.assertIsInstance(task, QgsTask)
tm.addTask(task)
while task.status() not in [QgsTask.Complete, QgsTask.Terminated]:
pass
while QgsApplication.taskManager().countActiveTasks() > 0:
QCoreApplication.processEvents()
self.assertTrue(nTasks == 1)
self.assertTrue(result == 'Hello')
def test_PixelLoader(self):
paths = [i.GetFileList()[0] for i in images]
rl = QgsRasterLayer(paths[0])
self.assertIsInstance(rl, QgsRasterLayer)
self.assertTrue(rl.isValid())
center = SpatialPoint.fromMapLayerCenter(rl)
cleft = SpatialPoint(center.crs(), center.x()-100, center.y())
geometries = [center, cleft]
pl = PixelLoader()
self.assertIsInstance(pl, PixelLoader)
tasks = []
for p in paths:
task = PixelLoaderTask(p, geometries)
tasks.append(task)
pl.startLoading(tasks)
from eotimeseriesviewer.pixelloader import doLoaderTask, PixelLoaderTask, INFO_OUT_OF_IMAGE, INFO_NO_DATA
source = example.Images.Img_2014_05_15_LE72270652014135CUB00_BOA
ext = SpatialExtent.fromRasterSource(source)
ds = gdal.Open(source)
gt = ds.GetGeoTransform()
wkt = ds.GetProjection()
ds = None
ptNoData = px2geo(QgsPointXY(66,2), gt)
pt = SpatialPoint.fromSpatialExtent(ext)
x,y = pt.x(), pt.y()
ptOutOfImage = SpatialExtent(ext.crs(), x + 10000, y, x + 12000, y + 70) # out of image
ptNoData = SpatialPoint(ext.crs(), ptNoData)
ptValid1 = SpatialPoint(ext.crs(), x, y)
ptValid2 = SpatialPoint(ext.crs(), x+50, y+50)
#test a valid pixels
plt = PixelLoaderTask(source, [ptValid1, ptValid2])
task = QgsTask.fromFunction('', doLoaderTask, plt)
result = doLoaderTask(task, plt.toDump())
result = PixelLoaderTask.fromDump(result)
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
except Exception as ex:
self.fail('Failed to return the pixels for two geometries')
self.assertIsInstance(result, PixelLoaderTask)
self.assertTrue(result.success())
self.assertEqual(result.sourcePath, source)
self.assertSequenceEqual(result.bandIndices, [0,1,2,3,4,5])
self.assertIs(result.exception, None)
self.assertEqual(result.resCrsWkt, wkt)
self.assertEqual(result.resGeoTransformation, gt)
self.assertEqual(result.resNoDataValues, None)
self.assertIsInstance(result.resProfiles, list)
self.assertEqual(len(result.resProfiles), 2, msg='did not return results for two geometries.')
for i in range(len(result.resProfiles)):
dn, stdDev = result.resProfiles[i]
self.assertIsInstance(dn, np.ndarray)
self.assertIsInstance(stdDev, np.ndarray)
self.assertEqual(len(dn), len(stdDev))
self.assertEqual(len(dn), 6)
self.assertEqual(stdDev.max(), 0) #std deviation of a single pixel is always zero
self.assertEqual(stdDev.min(), 0)
def test_loadProfiles(self):
from eotimeseriesviewer import SpatialPoint, SpatialExtent, px2geo
img1 = self.img1
nb, nl, ns = self.img1Shape
gt = self.img1gt
ext = SpatialExtent.fromRasterSource(img1)
ptUL = SpatialPoint(ext.crs(), *ext.upperLeft())
ptLR = px2geo(QPoint(ns-1, nl-1), gt)
ptLR = SpatialPoint(ext.crs(), ptLR)
ptOutOfImage = SpatialPoint(ext.crs(), px2geo(QPoint(-1,-1), gt))
dir = os.path.dirname(example.Images.__file__)
# files = file_search(dir, '*.tif')
files = [example.Images.Img_2014_05_07_LC82270652014127LGN00_BOA]
files.append(example.Images.Img_2014_04_29_LE72270652014119CUB00_BOA)
files.extend(file_search(dir, 're_*.tif'))
for f in files: print(f)
ext = SpatialExtent.fromRasterSource(files[0])
x, y = ext.center()
# SpatialPoint(ext.crs(), 681151.214,-752388.476), #nodata in Img_2014_04_29_LE72270652014119CUB00_BOA
SpatialExtent(ext.crs(), x + 10000, y, x + 12000, y + 70), # out of image
SpatialExtent(ext.crs(), x, y, x + 10000, y + 70),
]
geoms2 = [SpatialPoint(ext.crs(), x, y),
SpatialPoint(ext.crs(), x + 250, y + 70)]
nonlocal loaded_values
self.assertIsInstance(task, PixelLoaderTask)
loaded_values.append(task)
PL.sigPixelLoaded.connect(onPxLoaded)
PL.sigLoadingFinished.connect(lambda: onDummy('finished'))
PL.sigLoadingStarted.connect(lambda: onDummy('started'))
tasks1 = []
for i, f in enumerate(files):
kwargs = {'myid': 'myID{}'.format(i)}
tasks1.append(PixelLoaderTask(f, geoms1, bandIndices=None, **kwargs))
tasks2 = []
for i, f in enumerate(files):
kwargs = {'myid': 'myID{}'.format(i)}
tasks2.append(PixelLoaderTask(f, geoms2, bandIndices=None, **kwargs))
PL.startLoading(tasks1)
PL.startLoading(tasks2)
self.assertTrue(len(loaded_values) == len(tasks2)+len(tasks1))
if __name__ == "__main__":
SHOW_GUI = False and os.environ.get('CI') is None