Skip to content
Snippets Groups Projects
pixelloader.py 15.6 KiB
Newer Older
# -*- coding: utf-8 -*-
"""
/***************************************************************************
                              HUB TimeSeriesViewer
                              -------------------
        begin                : 2015-08-20
        git sha              : $Format:%H$
        copyright            : (C) 2017 by HU-Berlin
        email                : benjamin.jakimow@geo.hu-berlin.de
 ***************************************************************************/

/***************************************************************************
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 ***************************************************************************/
"""
# noinspection PyPep8Naming
from __future__ import absolute_import
import os, sys, pickle
import datetime
from qgis.gui import *
from qgis.core import *
from PyQt4.QtCore import *
import numpy as np
from timeseriesviewer.utils import SpatialPoint, SpatialExtent, geo2px, px2geo
from osgeo import gdal, gdal_array, osr
    """
    Evaluates if a pixel is inside and image or onot
    :param ds: gdal.Dataset
    :param px: QPoint
    :return: True | False
    """
    if px.x() < 0 or px.x() >= ds.RasterXSize:
        return True
    if px.y() < 0 or px.y() >= ds.RasterYSize:
        return True
    return False

    """
    An object to store the results of an loading from a single raster source.
    """
    def __init__(self, source, geometries, bandIndices=None, **kwargs):
        """

        :param jobId: jobId number as given by the calling PixelLoader
        :param processId: processId, as managed by the calling PixelLoader
        :param geometry: SpatialPoint that describes the pixels to be loaded
        :param source: file path to raster image.
        :param kwargs: additional stuff returned, e.g. to identify somethin
        """
        assert isinstance(geometries, list)
        for geometry in geometries:
            assert type(geometry) in [SpatialExtent, SpatialPoint]

        assert os.path.isfile(source)


        #assert isinstance(source, str) or isinstance(source, unicode)
        self.sourcePath = source
        self.geometries = geometries
        self.bandIndices = bandIndices

        #for internal use only
        self._jobId = None
        self._processId = None
        self._done = False

        #for returned data
        self.resCrsWkt = None
        self.resGeoTransformation = None
        self.resProfiles = None
        self.resNoDataValues = None
        self.exception = None
        self.info = None
        for k in kwargs.keys():
            assert type(k) in [str, unicode]
            assert not k.startswith('_')
            if not k in self.__dict__.keys():
                self.__dict__[k] = kwargs[k]
        return QgsCoordinateReferenceSystem(self.resCrsWkt)
    def depr_imagePixelIndices(self):
        """
        Returns the image pixel indices related to the extracted value subset
        :return: (numpy array y, numpy array x)
        """
        if self.pxUL is None:
            return None

        pxIdxX = np.arange(0, self.pxSubsetSize.width()) + self.pxUL.x()
        pxIdxY = np.arange(0, self.pxSubsetSize.height()) + self.pxUL.y()
        return (pxIdxY,pxIdxX)

    def pixelResolution(self):
        """
        Returns the spatial pixel resolution
        :return: QSize
        """
        if self.resGeoTransformation is None:
        p1 = px2geo(QPoint(0, 0), self.resGeoTransformation)
        p2 = px2geo(QPoint(1, 1), self.resGeoTransformation)

        xRes = abs(p2.x() - p1.x())
        yRes = abs(p2.y() - p1.y())
        return QSize(xRes, yRes)

    def success(self):
        """
        :return:
        """
        return self._done and self.exception is None

    def __repr__(self):
        info = ['PixelLoaderTask:']
        if not self._done:
            info.append('not started...')
        else:
            if self.bandIndices:
                info.append('\tBandIndices {}:{}'.format(len(self.bandIndices), self.bandIndices))
            if self.resProfiles:
                info.append('\tProfileData: {}'.format(len(self.resProfiles)))
                for i, p in enumerate(self.resProfiles):
                    g = self.geometries[i]
                    d = self.resProfiles[i]
                    if d == INFO_OUT_OF_IMAGE:
                        info.append('\t{}: {}:{}'.format(i + 1, g, INFO_OUT_OF_IMAGE))
                    else:
                        vData = d[0]
                        vStd = d[1]
                        info.append('\t{}: {}:{} : {}'.format(i+1, g, vData.shape, vData))

        return '\n'.join(info)

LOADING_FINISHED = 'finished'
LOADING_CANCELED = 'canceled'
INFO_OUT_OF_IMAGE = 'out of image'

#def loadProfiles(pathsAndBandIndices, jobid, poolWorker, geom, q, cancelEvent, **kwargs):

def loadProfiles(taskList, queue, cancelEvent, **kwargs):
    for task in taskList:
        assert isinstance(task, PixelLoaderTask)
        if cancelEvent.is_set():
            return LOADING_CANCELED

        result = doLoaderTask(task)
        assert isinstance(result, PixelLoaderTask)
def transformPoint2Px(trans, pt, gt):
    x, y, _ = trans.TransformPoint(pt.x(), pt.y())
    return geo2px(QgsPoint(x, y), gt)
    assert isinstance(task, PixelLoaderTask)
    ds = gdal.Open(task.sourcePath, gdal.GA_ReadOnly)
    nb, ns, nl = ds.RasterCount, ds.RasterXSize, ds.RasterYSize
    bandIndices = list(range(nb)) if task.bandIndices is None else list(task.bandIndices)

    gt = ds.GetGeoTransform()
    result.resGeoTransformation = gt
    result.resCrsWkt = ds.GetProjection()
    crsSrc = osr.SpatialReference(result.resCrsWkt)


    #convert Geometries into pixel indices to be extracted
    PX_SUBSETS = []



    for geom in task.geometries:

        crsRequest = osr.SpatialReference()
        crsRequest.ImportFromWkt(geom.crs().toWkt())
        trans = osr.CoordinateTransformation(crsRequest, crsSrc)

        if isinstance(geom, QgsPoint):
            ptUL = ptLR = QgsPoint(geom)
        elif isinstance(geom, QgsRectangle):
            TYPE = 'RECTANGLE'
            ptUL = QgsPoint(geom.xMinimum(), geom.yMaximum())
            ptLR = QgsPoint(geom.xMaximum(), geom.yMinimum())
        else:
            PX_SUBSETS.append(INFO_OUT_OF_IMAGE)

        pxUL = transformPoint2Px(trans, ptUL, gt)
        pxLR = transformPoint2Px(trans, ptLR, gt)

        bUL = isOutOfImage(ds, pxUL)
        bLR = isOutOfImage(ds, pxLR)


        if (bUL or bLR) and not (bUL and bLR):
            #at least one point is inside the raster image:
            PX_SUBSETS.append(INFO_OUT_OF_IMAGE)
        if pxUL == pxLR:
            size_x = size_y = 1
        else:
            size_x = abs(pxUL.x() - pxLR.x())
            size_y = abs(pxUL.y() - pxLR.y())

        if size_x < 1: size_x = 1
        if size_y < 1: size_y = 1

        PX_SUBSETS.append((pxUL, pxUL, size_x, size_y))

    PROFILE_DATA = []

    if bandIndices == range(ds.RasterCount):
        #we have to extract all bands
        #in this case we use gdal.Dataset.ReadAsArray()

        for px in PX_SUBSETS:
            if px == INFO_OUT_OF_IMAGE:
                PROFILE_DATA.append(INFO_OUT_OF_IMAGE)
                continue

            pxUL, pxUL, size_x, size_y = px
            PROFILE_DATA.append(ds.ReadAsArray(pxUL.x(), pxUL.y(), size_x, size_y))
    else:
        # access band values band-by-band
        # in this case we use gdal.Band.ReadAsArray()
        # and need to iterate over the requested band indices

        #save the returned band values for each geometry in a separate list
        #empty list == invalid geometry
        for i in range(len(PX_SUBSETS)):
            if PX_SUBSETS[i] == INFO_OUT_OF_IMAGE:
                PROFILE_DATA.append(INFO_OUT_OF_IMAGE)
            else:
                PROFILE_DATA.append([])


        for bandIndex in bandIndices:
            band = ds.GetRasterBand(bandIndex+1)
            assert isinstance(band, gdal.Band)

            for i, px in enumerate(PX_SUBSETS):
                if px == INFO_OUT_OF_IMAGE:
                    continue
                pxUL, pxUL, size_x, size_y = px
                bandData = band.ReadAsArray(pxUL.x(), pxUL.y(), size_x, size_y)
                PROFILE_DATA[i].append(bandData)

        for i in range(len(PX_SUBSETS)):
            pd = PROFILE_DATA[i]
            if len(pd) == 0:
                PROFILE_DATA[i] = INFO_OUT_OF_IMAGE
            else:
                PROFILE_DATA[i] = np.dstack(pd).transpose(2,0,1)



    #finally, ensure that there is on 2D array only
    for i in range(len(PROFILE_DATA)):
        d = PROFILE_DATA[i]
        if d != INFO_OUT_OF_IMAGE:
            _, _, size_x, size_y = PX_SUBSETS[i]
            b, y, x = d.shape
            assert size_y == y
            assert size_x == x
            assert b == len(bandIndices)
            d = d.reshape((b, y*x))


            vMean = d.mean(axis=1)
            vStd = d.std(axis=1)
            PROFILE_DATA[i] = (vMean, vStd)

    task.resProfiles = PROFILE_DATA
    task._done = True
    return task



class PixelLoader(QObject):
    """
    Loads pixel from raster images
    """
    sigPixelLoaded = pyqtSignal(int, int, PixelLoaderTask)
    sigLoadingStarted = pyqtSignal(list)
    sigLoadingCanceled = pyqtSignal()
    _sigStartThreadWorkers = pyqtSignal(str)
    _sigTerminateThreadWorkers = pyqtSignal()

    def __init__(self, *args, **kwds):
        super(PixelLoader, self).__init__(*args, **kwds)
        self.filesList = []
        self.jobid = -1
        self.threadsAndWorkers = []
        self.queueChecker = QTimer()
        self.queueChecker.setInterval(3000)
        self.queueChecker.timeout.connect(self.checkQueue)

        self.pool = None
        self.mAsyncResults = []
        self.resultQueue = self.MGR.Queue()
        self.cancelEvent = self.MGR.Event()


    @QtCore.pyqtSlot(PixelLoaderTask)
    def onPixelLoaded(self, data):
        assert isinstance(data, PixelLoaderTask)
        if data._jobId != self.jobid:
            #do not return results from previous jobs...
            #print('got thread results from {} but need {}...'.format(jobid, self.jobid))
            return
            self.filesList.remove(data.sourcePath)
            self.sigPixelLoaded.emit(self.nMax - len(self.filesList), self.nMax, data)
            if len(self.filesList) == 0:
                dt = np.datetime64('now', 'ms') - self.mLoadingStartTime
                self.sigLoadingFinished.emit(dt)
    def setNumberOfProcesses(self, nProcesses):
        assert nProcesses >= 1
        self.nProcesses = nProcesses
    def startLoading(self, tasks):

        assert isinstance(tasks, list)

        paths = []
        for t in tasks:
            assert isinstance(t, PixelLoaderTask)
            paths.append(t.sourcePath)
        self.sigLoadingStarted.emit(paths[:])
        self.filesList.extend(paths)
        self.nMax = l
        self.nFailed = 0

        #split tasks into workpackages to be solve per parallel process
        workPackages = list()
        i = 0
        _tasks = tasks[:]
        while(len(_tasks)) > 0:
            if len(workPackages) <= i:
                workPackages.append([])
            task = _tasks.pop(0)
            task._jobId = self.jobid
            workPackages[i].append(task)
            i = i + 1 if i < self.nProcesses - 1 else 0

        from multiprocessing.pool import Pool

        if not DEBUG:
            if isinstance(self.pool, Pool):
                self.pool.terminate()
                self.pool = None

            self.pool = Pool(self.nProcesses)
            del self.mAsyncResults[:]
            self.queueChecker.start()

        #print('theGeometryWKT: '+theGeometry.crs().toWkt())
        for i, workPackage in enumerate(workPackages):
            #args = (workPackage, self.jobid, i, theGeometries, self.resultQueue, self.cancelEvent)
            # kwds = {'profileID':profileID}

            #set workpackage / thread-specific internal metdata
            for t in workPackage:
                assert isinstance(t, PixelLoaderTask)
                t.__processId = i


            args = (workPackage, self.resultQueue, self.cancelEvent)
            kwds = {}

                self.checkQueue(loadProfiles(*args, **kwds))
                r = self.pool.apply_async(loadProfiles, args=args, callback=self.checkQueue, **kwds)
                self.mAsyncResults.append(r)
        if not DEBUG:
            self.pool.close()

    def cancelLoading(self):
        self.cancelEvent.set()
    def isReadyToLoad(self):
        return len(self.mAsyncResults) == 0 and self.pool is None
    def checkQueue(self, *args):
        while not self.resultQueue.empty():
            md = self.resultQueue.get()
            self.onPixelLoaded(md)

        if all([w.ready() for w in self.mAsyncResults]):
            #print('All done')

            del self.mAsyncResults[:]
            self.pool = None
            if not self.cancelEvent.is_set():
        elif self.cancelEvent.is_set():
            self.queueChecker.stop()
            self.sigLoadingCanceled.emit()
if __name__ == '__main__':
    from timeseriesviewer.utils import initQgisApplication
    qgsApp = initQgisApplication()
    from PyQt4.QtGui import *
    gb = QGroupBox()
    gb.setTitle('Sandbox')
    PL = PixelLoader()
    PL.setNumberOfProcesses(1)

    import example.Images
    from timeseriesviewer import file_search
    dir = os.path.dirname(example.Images.__file__)
    #files = file_search(dir, '*.tif')
    files = [example.Images.Img_2014_05_07_LC82270652014127LGN00_BOA]
    files.extend(file_search(dir, 're_*.tif'))
    ext = SpatialExtent.fromRasterSource(files[0])

    from qgis.core import QgsPoint
    x,y = ext.center()


    geoms = [SpatialExtent(ext.crs(),x,y,x+250, y+70 ),
             SpatialPoint(ext.crs(), x,y),
             SpatialPoint(ext.crs(), x+250, y+70)]

    from multiprocessing import Pool

    def onPxLoaded(*args):
        n, nmax, task = args
        assert isinstance(task, PixelLoaderTask)
        print(task)

    PL = PixelLoader()
    def onDummy(*args):
        print(('dummy',args))

    def onTimer(*args):
        print(('TIMER',PL))
    PL.sigPixelLoaded.connect(onPxLoaded)
    PL.sigLoadingFinished.connect(lambda: onDummy('finished'))
    PL.sigLoadingCanceled.connect(lambda: onDummy('canceled'))
    PL.sigLoadingStarted.connect(lambda: onDummy('started'))
    PL.sigPixelLoaded.connect(lambda : onDummy('px loaded'))

    tasks = []
    for i, f in enumerate(files):
        kwargs = {'myid':'myID{}'.format(i)}
        tasks.append(PixelLoaderTask(f, geoms, bandIndices=None, **kwargs))

    PL.startLoading(tasks)

    #QTimer.singleShot(2000, lambda : PL.cancelLoading())
    qgsApp.exec_()