From 85a4cddf75ef0cf2a71a24e5925a9bc3b10b34b6 Mon Sep 17 00:00:00 2001 From: Benjamin Jakimow <benjamin.jakimow@geo.hu-berlin.de> Date: Thu, 15 Feb 2018 09:28:10 +0100 Subject: [PATCH] refactoring of + several improvements to PixelLoader --- timeseriesviewer/pixelloader.py | 397 +++++++++++++++++++++----------- 1 file changed, 261 insertions(+), 136 deletions(-) diff --git a/timeseriesviewer/pixelloader.py b/timeseriesviewer/pixelloader.py index 5286233b..adef618f 100644 --- a/timeseriesviewer/pixelloader.py +++ b/timeseriesviewer/pixelloader.py @@ -34,49 +34,69 @@ from osgeo import gdal, gdal_array, osr DEBUG = False def isOutOfImage(ds, px): + """ + Evaluates if a pixel is inside and image or onot + :param ds: gdal.Dataset + :param px: QPoint + :return: True | False + """ if px.x() < 0 or px.x() >= ds.RasterXSize: return True if px.y() < 0 or px.y() >= ds.RasterYSize: return True return False -class PixelLoaderResult(object): + + +class PixelLoaderTask(object): """ An object to store the results of an loading from a single raster source. """ - def __init__(self, jobId, processId, geometry, source ,**kwargs): - assert jobId is not None - assert processId is not None - assert type(geometry) in [SpatialExtent, SpatialPoint] - assert isinstance(source, str) or isinstance(source, unicode) - self.jobId = jobId - self.processId = processId - self.geometry = geometry - self.pxUL = self.pxSubsetSize = None - self.srcCrsWkt = None - self.geoTransformation = None - self.source = source - self.pxData = None - self.pxBandIndices = None - self.noDataValue = None + def __init__(self, source, geometries, bandIndices=None, **kwargs): + """ + + :param jobId: jobId number as given by the calling PixelLoader + :param processId: processId, as managed by the calling PixelLoader + :param geometry: SpatialPoint that describes the pixels to be loaded + :param source: file path to raster image. + :param kwargs: additional stuff returned, e.g. to identify somethin + """ + assert isinstance(geometries, list) + for geometry in geometries: + assert type(geometry) in [SpatialExtent, SpatialPoint] + + assert os.path.isfile(source) + + + #assert isinstance(source, str) or isinstance(source, unicode) + self.sourcePath = source + self.geometries = geometries + self.bandIndices = bandIndices + + #for internal use only + self._jobId = None + self._processId = None + self._done = False + + #for returned data + self.resCrsWkt = None + self.resGeoTransformation = None + self.resProfiles = None + self.resNoDataValues = None self.exception = None self.info = None + + #other, free keywords for k in kwargs.keys(): + assert type(k) in [str, unicode] + assert not k.startswith('_') if not k in self.__dict__.keys(): self.__dict__[k] = kwargs[k] - def setValues(self, values, bandIndices = None, noDataValue=None): - self.pxData = values - self.noDataValue = noDataValue - if bandIndices is None: - self.pxBandIndices = np.arange(self.pxData.shape[0]) - else: - self.pxBandIndices = bandIndices - def imageCrs(self): - return QgsCoordinateReferenceSystem(self.srcCrsWkt) + return QgsCoordinateReferenceSystem(self.resCrsWkt) - def imagePixelIndices(self): + def depr_imagePixelIndices(self): """ Returns the image pixel indices related to the extracted value subset :return: (numpy array y, numpy array x) @@ -93,120 +113,197 @@ class PixelLoaderResult(object): Returns the spatial pixel resolution :return: QSize """ - if self.geoTransformation is None: + if self.resGeoTransformation is None: return None - p1 = px2geo(QPoint(0, 0), self.geoTransformation) - p2 = px2geo(QPoint(1, 1), self.geoTransformation) + p1 = px2geo(QPoint(0, 0), self.resGeoTransformation) + p2 = px2geo(QPoint(1, 1), self.resGeoTransformation) xRes = abs(p2.x() - p1.x()) yRes = abs(p2.y() - p1.y()) return QSize(xRes, yRes) def success(self): - return self.pxData is not None and self.exception is None + """ + :return: + """ + return self._done and self.exception is None + + def __repr__(self): + info = ['PixelLoaderTask:'] + if not self._done: + info.append('not started...') + else: + if self.bandIndices: + info.append('\tBandIndices {}:{}'.format(len(self.bandIndices), self.bandIndices)) + if self.resProfiles: + info.append('\tProfileData: {}'.format(len(self.resProfiles))) + for i, p in enumerate(self.resProfiles): + g = self.geometries[i] + d = self.resProfiles[i] + if d == INFO_OUT_OF_IMAGE: + info.append('\t{}: {}:{}'.format(i + 1, g, INFO_OUT_OF_IMAGE)) + else: + vData = d[0] + vStd = d[1] + info.append('\t{}: {}:{} : {}'.format(i+1, g, vData.shape, vData)) + + return '\n'.join(info) LOADING_FINISHED = 'finished' LOADING_CANCELED = 'canceled' INFO_OUT_OF_IMAGE = 'out of image' -def loadProfiles(pathsAndBandIndices, jobid, poolWorker, geom, q, cancelEvent, **kwargs): - assert type(geom) in [SpatialPoint, SpatialExtent] - - #this routine might run in a parallel thread. - crsRequest = osr.SpatialReference() - crsRequest.ImportFromWkt(geom.crs().toWkt()) - - ptUL = ptLR = None - TYPE = None - if isinstance(geom, QgsPoint): - ptUL = QgsPoint(geom) - TYPE = 'PIXEL' - elif isinstance(geom, QgsRectangle): - TYPE = 'RECTANGLE' - ptUL = QgsPoint(geom.xMinimum(), geom.yMaximum()) - ptLR = QgsPoint(geom.xMaximum(), geom.yMinimum()) - else: - raise NotImplementedError() +#def loadProfiles(pathsAndBandIndices, jobid, poolWorker, geom, q, cancelEvent, **kwargs): + - for i, t in enumerate(pathsAndBandIndices): - path, bandIndices = t +def loadProfiles(taskList, queue, cancelEvent, **kwargs): + for task in taskList: + assert isinstance(task, PixelLoaderTask) if cancelEvent.is_set(): return LOADING_CANCELED - R = PixelLoaderResult(jobid, poolWorker, geom, path, **kwargs) + result = doLoaderTask(task) - try: - ds = gdal.Open(path, gdal.GA_ReadOnly) - gt = ds.GetGeoTransform() - R.srcCrsWkt = ds.GetProjection() - R.geoTransformation = gt - crsSrc = osr.SpatialReference(R.srcCrsWkt) - #print('SRC {} WKT:{}'.format(i, R.srcCrsWkt)) - trans = osr.CoordinateTransformation(crsRequest, crsSrc) + assert isinstance(result, PixelLoaderTask) - def transformPoint2Px(trans, pt, gt): - x,y,_ = trans.TransformPoint(pt.x(),pt.y()) - return geo2px(QgsPoint(x,y), gt) + queue.put(result) + return LOADING_FINISHED - if TYPE == 'PIXEL': - px = transformPoint2Px(trans, ptUL, gt) - if isOutOfImage(ds, px): - R.info = INFO_OUT_OF_IMAGE - q.put(R) - continue +def transformPoint2Px(trans, pt, gt): + x, y, _ = trans.TransformPoint(pt.x(), pt.y()) + return geo2px(QgsPoint(x, y), gt) - size_x = 1 - size_y = 1 +def doLoaderTask(task): - elif TYPE == 'RECTANGLE': - px = transformPoint2Px(trans, ptUL, gt) - px2 = transformPoint2Px(trans, ptLR, gt) + assert isinstance(task, PixelLoaderTask) - #todo: cut to existing pixel coordinates - #if px2.x() > 0: px.setX(max([0, px.x()])) + result = task - if isOutOfImage(ds, px) or \ - isOutOfImage(ds, px2): - R.info = INFO_OUT_OF_IMAGE - q.put(R) - continue + ds = gdal.Open(task.sourcePath, gdal.GA_ReadOnly) + nb, ns, nl = ds.RasterCount, ds.RasterXSize, ds.RasterYSize - size_x = px2.x() - px.x() - size_y = px.y() - px2.y() - R.pxUL = px - R.pxSubsetSize = QSize(size_x, size_y) - values = None - noData = None - if bandIndices is None: - values = ds.ReadAsArray(px.x(), px.y(), size_x, size_y) - noData = [ds.GetRasterBand(b + 1).GetNoDataValue() for b in range(ds.RasterCount)] - else: - bandIndices = sorted(list(set(bandIndices))) - noData = [] - for j, b in enumerate(bandIndices): - band = ds.GetRasterBand(b+1) - bandData = band.ReadAsArray(px.x(), px.y(), size_x, size_y) - if values is None: - values = np.empty((len(bandIndices), size_y, size_x), dtype=bandData.dtype) - - noData.append(band.GetNoDataValue()) - values[j,:] = bandData - - assert values.ndim == 3 - - except Exception as ex: - R.exception = ex - q.put(R) + bandIndices = list(range(nb)) if task.bandIndices is None else list(task.bandIndices) + + gt = ds.GetGeoTransform() + result.resGeoTransformation = gt + result.resCrsWkt = ds.GetProjection() + crsSrc = osr.SpatialReference(result.resCrsWkt) + + + #convert Geometries into pixel indices to be extracted + PX_SUBSETS = [] + + + + for geom in task.geometries: + + crsRequest = osr.SpatialReference() + crsRequest.ImportFromWkt(geom.crs().toWkt()) + trans = osr.CoordinateTransformation(crsRequest, crsSrc) + + if isinstance(geom, QgsPoint): + ptUL = ptLR = QgsPoint(geom) + elif isinstance(geom, QgsRectangle): + TYPE = 'RECTANGLE' + ptUL = QgsPoint(geom.xMinimum(), geom.yMaximum()) + ptLR = QgsPoint(geom.xMaximum(), geom.yMinimum()) + else: + PX_SUBSETS.append(INFO_OUT_OF_IMAGE) + + pxUL = transformPoint2Px(trans, ptUL, gt) + pxLR = transformPoint2Px(trans, ptLR, gt) + + bUL = isOutOfImage(ds, pxUL) + bLR = isOutOfImage(ds, pxLR) + + + if (bUL or bLR) and not (bUL and bLR): + #at least one point is inside the raster image: + PX_SUBSETS.append(INFO_OUT_OF_IMAGE) continue - R.setValues(values, bandIndices=bandIndices, noDataValue=noData) + if pxUL == pxLR: + size_x = size_y = 1 + else: + size_x = abs(pxUL.x() - pxLR.x()) + size_y = abs(pxUL.y() - pxLR.y()) + + if size_x < 1: size_x = 1 + if size_y < 1: size_y = 1 + + PX_SUBSETS.append((pxUL, pxUL, size_x, size_y)) + + PROFILE_DATA = [] + + if bandIndices == range(ds.RasterCount): + #we have to extract all bands + #in this case we use gdal.Dataset.ReadAsArray() + + for px in PX_SUBSETS: + if px == INFO_OUT_OF_IMAGE: + PROFILE_DATA.append(INFO_OUT_OF_IMAGE) + continue + + pxUL, pxUL, size_x, size_y = px + PROFILE_DATA.append(ds.ReadAsArray(pxUL.x(), pxUL.y(), size_x, size_y)) + else: + # access band values band-by-band + # in this case we use gdal.Band.ReadAsArray() + # and need to iterate over the requested band indices + + #save the returned band values for each geometry in a separate list + #empty list == invalid geometry + for i in range(len(PX_SUBSETS)): + if PX_SUBSETS[i] == INFO_OUT_OF_IMAGE: + PROFILE_DATA.append(INFO_OUT_OF_IMAGE) + else: + PROFILE_DATA.append([]) + + + for bandIndex in bandIndices: + band = ds.GetRasterBand(bandIndex+1) + assert isinstance(band, gdal.Band) + + for i, px in enumerate(PX_SUBSETS): + if px == INFO_OUT_OF_IMAGE: + continue + pxUL, pxUL, size_x, size_y = px + bandData = band.ReadAsArray(pxUL.x(), pxUL.y(), size_x, size_y) + PROFILE_DATA[i].append(bandData) + + for i in range(len(PX_SUBSETS)): + pd = PROFILE_DATA[i] + if len(pd) == 0: + PROFILE_DATA[i] = INFO_OUT_OF_IMAGE + else: + PROFILE_DATA[i] = np.dstack(pd).transpose(2,0,1) + + + + #finally, ensure that there is on 2D array only + for i in range(len(PROFILE_DATA)): + d = PROFILE_DATA[i] + if d != INFO_OUT_OF_IMAGE: + _, _, size_x, size_y = PX_SUBSETS[i] + b, y, x = d.shape + assert size_y == y + assert size_x == x + assert b == len(bandIndices) + d = d.reshape((b, y*x)) + + + vMean = d.mean(axis=1) + vStd = d.std(axis=1) + PROFILE_DATA[i] = (vMean, vStd) + + task.resProfiles = PROFILE_DATA + task._done = True + return task - q.put(R) - return LOADING_FINISHED @@ -215,7 +312,7 @@ class PixelLoader(QObject): Loads pixel from raster images """ - sigPixelLoaded = pyqtSignal(int, int, PixelLoaderResult) + sigPixelLoaded = pyqtSignal(int, int, PixelLoaderTask) sigLoadingStarted = pyqtSignal(list) sigLoadingFinished = pyqtSignal(np.timedelta64) sigLoadingCanceled = pyqtSignal() @@ -243,15 +340,15 @@ class PixelLoader(QObject): self.cancelEvent = self.MGR.Event() - @QtCore.pyqtSlot(PixelLoaderResult) + @QtCore.pyqtSlot(PixelLoaderTask) def onPixelLoaded(self, data): - assert isinstance(data, PixelLoaderResult) - if data.jobId != self.jobid: + assert isinstance(data, PixelLoaderTask) + if data._jobId != self.jobid: #do not return results from previous jobs... #print('got thread results from {} but need {}...'.format(jobid, self.jobid)) return else: - self.filesList.remove(data.source) + self.filesList.remove(data.sourcePath) self.sigPixelLoaded.emit(self.nMax - len(self.filesList), self.nMax, data) if len(self.filesList) == 0: @@ -263,14 +360,15 @@ class PixelLoader(QObject): assert nProcesses >= 1 self.nProcesses = nProcesses - def startLoading(self, paths, theGeometry, bandIndices=None, profileID=''): - assert isinstance(paths, list) - if bandIndices is not None: - assert len(bandIndices) == len(paths) - else: - bandIndices = [None for _ in paths] + def startLoading(self, tasks): + + assert isinstance(tasks, list) + + paths = [] + for t in tasks: + assert isinstance(t, PixelLoaderTask) + paths.append(t.sourcePath) - assert type(theGeometry) in [SpatialPoint, SpatialExtent] self.mLoadingStartTime = np.datetime64('now', 'ms') self.jobid += 1 self.sigLoadingStarted.emit(paths[:]) @@ -280,14 +378,16 @@ class PixelLoader(QObject): self.nMax = l self.nFailed = 0 - #split number of files into list - _pathsAndIndices = zip(paths, bandIndices) + #split tasks into workpackages to be solve per parallel process workPackages = list() i = 0 - while(len(_pathsAndIndices)) > 0: + _tasks = tasks[:] + while(len(_tasks)) > 0: if len(workPackages) <= i: workPackages.append([]) - workPackages[i].append(_pathsAndIndices.pop(0)) + task = _tasks.pop(0) + task._jobId = self.jobid + workPackages[i].append(task) i = i + 1 if i < self.nProcesses - 1 else 0 from multiprocessing.pool import Pool @@ -303,14 +403,25 @@ class PixelLoader(QObject): #print('theGeometryWKT: '+theGeometry.crs().toWkt()) for i, workPackage in enumerate(workPackages): - args = (workPackage, self.jobid, i, theGeometry, self.resultQueue, self.cancelEvent) - kwds = {'profileID':profileID} + #args = (workPackage, self.jobid, i, theGeometries, self.resultQueue, self.cancelEvent) + # kwds = {'profileID':profileID} + + #set workpackage / thread-specific internal metdata + for t in workPackage: + assert isinstance(t, PixelLoaderTask) + t.__processId = i + + + args = (workPackage, self.resultQueue, self.cancelEvent) + kwds = {} + if DEBUG: self.checkQueue(loadProfiles(*args, **kwds)) else: r = self.pool.apply_async(loadProfiles, args=args, callback=self.checkQueue, **kwds) self.mAsyncResults.append(r) + if not DEBUG: self.pool.close() @@ -345,12 +456,12 @@ class PixelLoader(QObject): if __name__ == '__main__': - from timeseriesviewer.sandbox import initQgisEnvironment - qgsApp = initQgisEnvironment() + from timeseriesviewer.utils import initQgisApplication + qgsApp = initQgisApplication() from PyQt4.QtGui import * gb = QGroupBox() gb.setTitle('Sandbox') - + DEBUG = True PL = PixelLoader() PL.setNumberOfProcesses(1) @@ -361,19 +472,27 @@ if __name__ == '__main__': files = [example.Images.Img_2014_05_07_LC82270652014127LGN00_BOA] files.extend(file_search(dir, 're_*.tif')) ext = SpatialExtent.fromRasterSource(files[0]) - pt = SpatialPoint(ext.crs(), ext.center()) + + from qgis.core import QgsPoint + x,y = ext.center() + + + geoms = [SpatialExtent(ext.crs(),x,y,x+250, y+70 ), + SpatialPoint(ext.crs(), x,y), + SpatialPoint(ext.crs(), x+250, y+70)] from multiprocessing import Pool def onPxLoaded(*args): - n, nmax, plr = args - assert isinstance(plr, PixelLoaderResult) + n, nmax, task = args + assert isinstance(task, PixelLoaderTask) + print(task) PL = PixelLoader() def onDummy(*args): print(('dummy',args)) - pass + s ="" def onTimer(*args): print(('TIMER',PL)) pass @@ -382,9 +501,15 @@ if __name__ == '__main__': PL.sigLoadingCanceled.connect(lambda: onDummy('canceled')) PL.sigLoadingStarted.connect(lambda: onDummy('started')) PL.sigPixelLoaded.connect(lambda : onDummy('px loaded')) - PL.startLoading(files, pt) - QTimer.singleShot(2000, lambda : PL.cancelLoading()) + tasks = [] + for i, f in enumerate(files): + kwargs = {'myid':'myID{}'.format(i)} + tasks.append(PixelLoaderTask(f, geoms, bandIndices=None, **kwargs)) + + PL.startLoading(tasks) + + #QTimer.singleShot(2000, lambda : PL.cancelLoading()) qgsApp.exec_() s = "" \ No newline at end of file -- GitLab