Skip to content
Snippets Groups Projects
Commit 85a4cddf authored by Benjamin Jakimow's avatar Benjamin Jakimow
Browse files

refactoring of + several improvements to PixelLoader

parent 45ff9baf
No related branches found
No related tags found
No related merge requests found
...@@ -34,49 +34,69 @@ from osgeo import gdal, gdal_array, osr ...@@ -34,49 +34,69 @@ from osgeo import gdal, gdal_array, osr
DEBUG = False DEBUG = False
def isOutOfImage(ds, px): def isOutOfImage(ds, px):
"""
Evaluates if a pixel is inside and image or onot
:param ds: gdal.Dataset
:param px: QPoint
:return: True | False
"""
if px.x() < 0 or px.x() >= ds.RasterXSize: if px.x() < 0 or px.x() >= ds.RasterXSize:
return True return True
if px.y() < 0 or px.y() >= ds.RasterYSize: if px.y() < 0 or px.y() >= ds.RasterYSize:
return True return True
return False return False
class PixelLoaderResult(object):
class PixelLoaderTask(object):
""" """
An object to store the results of an loading from a single raster source. An object to store the results of an loading from a single raster source.
""" """
def __init__(self, jobId, processId, geometry, source ,**kwargs): def __init__(self, source, geometries, bandIndices=None, **kwargs):
assert jobId is not None """
assert processId is not None
assert type(geometry) in [SpatialExtent, SpatialPoint] :param jobId: jobId number as given by the calling PixelLoader
assert isinstance(source, str) or isinstance(source, unicode) :param processId: processId, as managed by the calling PixelLoader
self.jobId = jobId :param geometry: SpatialPoint that describes the pixels to be loaded
self.processId = processId :param source: file path to raster image.
self.geometry = geometry :param kwargs: additional stuff returned, e.g. to identify somethin
self.pxUL = self.pxSubsetSize = None """
self.srcCrsWkt = None assert isinstance(geometries, list)
self.geoTransformation = None for geometry in geometries:
self.source = source assert type(geometry) in [SpatialExtent, SpatialPoint]
self.pxData = None
self.pxBandIndices = None assert os.path.isfile(source)
self.noDataValue = None
#assert isinstance(source, str) or isinstance(source, unicode)
self.sourcePath = source
self.geometries = geometries
self.bandIndices = bandIndices
#for internal use only
self._jobId = None
self._processId = None
self._done = False
#for returned data
self.resCrsWkt = None
self.resGeoTransformation = None
self.resProfiles = None
self.resNoDataValues = None
self.exception = None self.exception = None
self.info = None self.info = None
#other, free keywords
for k in kwargs.keys(): for k in kwargs.keys():
assert type(k) in [str, unicode]
assert not k.startswith('_')
if not k in self.__dict__.keys(): if not k in self.__dict__.keys():
self.__dict__[k] = kwargs[k] self.__dict__[k] = kwargs[k]
def setValues(self, values, bandIndices = None, noDataValue=None):
self.pxData = values
self.noDataValue = noDataValue
if bandIndices is None:
self.pxBandIndices = np.arange(self.pxData.shape[0])
else:
self.pxBandIndices = bandIndices
def imageCrs(self): def imageCrs(self):
return QgsCoordinateReferenceSystem(self.srcCrsWkt) return QgsCoordinateReferenceSystem(self.resCrsWkt)
def imagePixelIndices(self): def depr_imagePixelIndices(self):
""" """
Returns the image pixel indices related to the extracted value subset Returns the image pixel indices related to the extracted value subset
:return: (numpy array y, numpy array x) :return: (numpy array y, numpy array x)
...@@ -93,120 +113,197 @@ class PixelLoaderResult(object): ...@@ -93,120 +113,197 @@ class PixelLoaderResult(object):
Returns the spatial pixel resolution Returns the spatial pixel resolution
:return: QSize :return: QSize
""" """
if self.geoTransformation is None: if self.resGeoTransformation is None:
return None return None
p1 = px2geo(QPoint(0, 0), self.geoTransformation) p1 = px2geo(QPoint(0, 0), self.resGeoTransformation)
p2 = px2geo(QPoint(1, 1), self.geoTransformation) p2 = px2geo(QPoint(1, 1), self.resGeoTransformation)
xRes = abs(p2.x() - p1.x()) xRes = abs(p2.x() - p1.x())
yRes = abs(p2.y() - p1.y()) yRes = abs(p2.y() - p1.y())
return QSize(xRes, yRes) return QSize(xRes, yRes)
def success(self): def success(self):
return self.pxData is not None and self.exception is None """
:return:
"""
return self._done and self.exception is None
def __repr__(self):
info = ['PixelLoaderTask:']
if not self._done:
info.append('not started...')
else:
if self.bandIndices:
info.append('\tBandIndices {}:{}'.format(len(self.bandIndices), self.bandIndices))
if self.resProfiles:
info.append('\tProfileData: {}'.format(len(self.resProfiles)))
for i, p in enumerate(self.resProfiles):
g = self.geometries[i]
d = self.resProfiles[i]
if d == INFO_OUT_OF_IMAGE:
info.append('\t{}: {}:{}'.format(i + 1, g, INFO_OUT_OF_IMAGE))
else:
vData = d[0]
vStd = d[1]
info.append('\t{}: {}:{} : {}'.format(i+1, g, vData.shape, vData))
return '\n'.join(info)
LOADING_FINISHED = 'finished' LOADING_FINISHED = 'finished'
LOADING_CANCELED = 'canceled' LOADING_CANCELED = 'canceled'
INFO_OUT_OF_IMAGE = 'out of image' INFO_OUT_OF_IMAGE = 'out of image'
def loadProfiles(pathsAndBandIndices, jobid, poolWorker, geom, q, cancelEvent, **kwargs): #def loadProfiles(pathsAndBandIndices, jobid, poolWorker, geom, q, cancelEvent, **kwargs):
assert type(geom) in [SpatialPoint, SpatialExtent]
#this routine might run in a parallel thread.
crsRequest = osr.SpatialReference()
crsRequest.ImportFromWkt(geom.crs().toWkt())
ptUL = ptLR = None
TYPE = None
if isinstance(geom, QgsPoint):
ptUL = QgsPoint(geom)
TYPE = 'PIXEL'
elif isinstance(geom, QgsRectangle):
TYPE = 'RECTANGLE'
ptUL = QgsPoint(geom.xMinimum(), geom.yMaximum())
ptLR = QgsPoint(geom.xMaximum(), geom.yMinimum())
else:
raise NotImplementedError()
for i, t in enumerate(pathsAndBandIndices): def loadProfiles(taskList, queue, cancelEvent, **kwargs):
path, bandIndices = t
for task in taskList:
assert isinstance(task, PixelLoaderTask)
if cancelEvent.is_set(): if cancelEvent.is_set():
return LOADING_CANCELED return LOADING_CANCELED
R = PixelLoaderResult(jobid, poolWorker, geom, path, **kwargs) result = doLoaderTask(task)
try: assert isinstance(result, PixelLoaderTask)
ds = gdal.Open(path, gdal.GA_ReadOnly)
gt = ds.GetGeoTransform()
R.srcCrsWkt = ds.GetProjection()
R.geoTransformation = gt
crsSrc = osr.SpatialReference(R.srcCrsWkt)
#print('SRC {} WKT:{}'.format(i, R.srcCrsWkt))
trans = osr.CoordinateTransformation(crsRequest, crsSrc)
def transformPoint2Px(trans, pt, gt): queue.put(result)
x,y,_ = trans.TransformPoint(pt.x(),pt.y())
return geo2px(QgsPoint(x,y), gt)
return LOADING_FINISHED
if TYPE == 'PIXEL': def transformPoint2Px(trans, pt, gt):
px = transformPoint2Px(trans, ptUL, gt) x, y, _ = trans.TransformPoint(pt.x(), pt.y())
if isOutOfImage(ds, px): return geo2px(QgsPoint(x, y), gt)
R.info = INFO_OUT_OF_IMAGE
q.put(R)
continue
size_x = 1 def doLoaderTask(task):
size_y = 1
elif TYPE == 'RECTANGLE': assert isinstance(task, PixelLoaderTask)
px = transformPoint2Px(trans, ptUL, gt)
px2 = transformPoint2Px(trans, ptLR, gt)
#todo: cut to existing pixel coordinates result = task
#if px2.x() > 0: px.setX(max([0, px.x()]))
if isOutOfImage(ds, px) or \ ds = gdal.Open(task.sourcePath, gdal.GA_ReadOnly)
isOutOfImage(ds, px2): nb, ns, nl = ds.RasterCount, ds.RasterXSize, ds.RasterYSize
R.info = INFO_OUT_OF_IMAGE
q.put(R)
continue
size_x = px2.x() - px.x()
size_y = px.y() - px2.y()
R.pxUL = px
R.pxSubsetSize = QSize(size_x, size_y)
values = None bandIndices = list(range(nb)) if task.bandIndices is None else list(task.bandIndices)
noData = None
if bandIndices is None: gt = ds.GetGeoTransform()
values = ds.ReadAsArray(px.x(), px.y(), size_x, size_y) result.resGeoTransformation = gt
noData = [ds.GetRasterBand(b + 1).GetNoDataValue() for b in range(ds.RasterCount)] result.resCrsWkt = ds.GetProjection()
else: crsSrc = osr.SpatialReference(result.resCrsWkt)
bandIndices = sorted(list(set(bandIndices)))
noData = []
for j, b in enumerate(bandIndices): #convert Geometries into pixel indices to be extracted
band = ds.GetRasterBand(b+1) PX_SUBSETS = []
bandData = band.ReadAsArray(px.x(), px.y(), size_x, size_y)
if values is None:
values = np.empty((len(bandIndices), size_y, size_x), dtype=bandData.dtype)
for geom in task.geometries:
noData.append(band.GetNoDataValue())
values[j,:] = bandData crsRequest = osr.SpatialReference()
crsRequest.ImportFromWkt(geom.crs().toWkt())
assert values.ndim == 3 trans = osr.CoordinateTransformation(crsRequest, crsSrc)
except Exception as ex: if isinstance(geom, QgsPoint):
R.exception = ex ptUL = ptLR = QgsPoint(geom)
q.put(R) elif isinstance(geom, QgsRectangle):
TYPE = 'RECTANGLE'
ptUL = QgsPoint(geom.xMinimum(), geom.yMaximum())
ptLR = QgsPoint(geom.xMaximum(), geom.yMinimum())
else:
PX_SUBSETS.append(INFO_OUT_OF_IMAGE)
pxUL = transformPoint2Px(trans, ptUL, gt)
pxLR = transformPoint2Px(trans, ptLR, gt)
bUL = isOutOfImage(ds, pxUL)
bLR = isOutOfImage(ds, pxLR)
if (bUL or bLR) and not (bUL and bLR):
#at least one point is inside the raster image:
PX_SUBSETS.append(INFO_OUT_OF_IMAGE)
continue continue
R.setValues(values, bandIndices=bandIndices, noDataValue=noData) if pxUL == pxLR:
size_x = size_y = 1
else:
size_x = abs(pxUL.x() - pxLR.x())
size_y = abs(pxUL.y() - pxLR.y())
if size_x < 1: size_x = 1
if size_y < 1: size_y = 1
PX_SUBSETS.append((pxUL, pxUL, size_x, size_y))
PROFILE_DATA = []
if bandIndices == range(ds.RasterCount):
#we have to extract all bands
#in this case we use gdal.Dataset.ReadAsArray()
for px in PX_SUBSETS:
if px == INFO_OUT_OF_IMAGE:
PROFILE_DATA.append(INFO_OUT_OF_IMAGE)
continue
pxUL, pxUL, size_x, size_y = px
PROFILE_DATA.append(ds.ReadAsArray(pxUL.x(), pxUL.y(), size_x, size_y))
else:
# access band values band-by-band
# in this case we use gdal.Band.ReadAsArray()
# and need to iterate over the requested band indices
#save the returned band values for each geometry in a separate list
#empty list == invalid geometry
for i in range(len(PX_SUBSETS)):
if PX_SUBSETS[i] == INFO_OUT_OF_IMAGE:
PROFILE_DATA.append(INFO_OUT_OF_IMAGE)
else:
PROFILE_DATA.append([])
for bandIndex in bandIndices:
band = ds.GetRasterBand(bandIndex+1)
assert isinstance(band, gdal.Band)
for i, px in enumerate(PX_SUBSETS):
if px == INFO_OUT_OF_IMAGE:
continue
pxUL, pxUL, size_x, size_y = px
bandData = band.ReadAsArray(pxUL.x(), pxUL.y(), size_x, size_y)
PROFILE_DATA[i].append(bandData)
for i in range(len(PX_SUBSETS)):
pd = PROFILE_DATA[i]
if len(pd) == 0:
PROFILE_DATA[i] = INFO_OUT_OF_IMAGE
else:
PROFILE_DATA[i] = np.dstack(pd).transpose(2,0,1)
#finally, ensure that there is on 2D array only
for i in range(len(PROFILE_DATA)):
d = PROFILE_DATA[i]
if d != INFO_OUT_OF_IMAGE:
_, _, size_x, size_y = PX_SUBSETS[i]
b, y, x = d.shape
assert size_y == y
assert size_x == x
assert b == len(bandIndices)
d = d.reshape((b, y*x))
vMean = d.mean(axis=1)
vStd = d.std(axis=1)
PROFILE_DATA[i] = (vMean, vStd)
task.resProfiles = PROFILE_DATA
task._done = True
return task
q.put(R)
return LOADING_FINISHED
...@@ -215,7 +312,7 @@ class PixelLoader(QObject): ...@@ -215,7 +312,7 @@ class PixelLoader(QObject):
Loads pixel from raster images Loads pixel from raster images
""" """
sigPixelLoaded = pyqtSignal(int, int, PixelLoaderResult) sigPixelLoaded = pyqtSignal(int, int, PixelLoaderTask)
sigLoadingStarted = pyqtSignal(list) sigLoadingStarted = pyqtSignal(list)
sigLoadingFinished = pyqtSignal(np.timedelta64) sigLoadingFinished = pyqtSignal(np.timedelta64)
sigLoadingCanceled = pyqtSignal() sigLoadingCanceled = pyqtSignal()
...@@ -243,15 +340,15 @@ class PixelLoader(QObject): ...@@ -243,15 +340,15 @@ class PixelLoader(QObject):
self.cancelEvent = self.MGR.Event() self.cancelEvent = self.MGR.Event()
@QtCore.pyqtSlot(PixelLoaderResult) @QtCore.pyqtSlot(PixelLoaderTask)
def onPixelLoaded(self, data): def onPixelLoaded(self, data):
assert isinstance(data, PixelLoaderResult) assert isinstance(data, PixelLoaderTask)
if data.jobId != self.jobid: if data._jobId != self.jobid:
#do not return results from previous jobs... #do not return results from previous jobs...
#print('got thread results from {} but need {}...'.format(jobid, self.jobid)) #print('got thread results from {} but need {}...'.format(jobid, self.jobid))
return return
else: else:
self.filesList.remove(data.source) self.filesList.remove(data.sourcePath)
self.sigPixelLoaded.emit(self.nMax - len(self.filesList), self.nMax, data) self.sigPixelLoaded.emit(self.nMax - len(self.filesList), self.nMax, data)
if len(self.filesList) == 0: if len(self.filesList) == 0:
...@@ -263,14 +360,15 @@ class PixelLoader(QObject): ...@@ -263,14 +360,15 @@ class PixelLoader(QObject):
assert nProcesses >= 1 assert nProcesses >= 1
self.nProcesses = nProcesses self.nProcesses = nProcesses
def startLoading(self, paths, theGeometry, bandIndices=None, profileID=''): def startLoading(self, tasks):
assert isinstance(paths, list)
if bandIndices is not None: assert isinstance(tasks, list)
assert len(bandIndices) == len(paths)
else: paths = []
bandIndices = [None for _ in paths] for t in tasks:
assert isinstance(t, PixelLoaderTask)
paths.append(t.sourcePath)
assert type(theGeometry) in [SpatialPoint, SpatialExtent]
self.mLoadingStartTime = np.datetime64('now', 'ms') self.mLoadingStartTime = np.datetime64('now', 'ms')
self.jobid += 1 self.jobid += 1
self.sigLoadingStarted.emit(paths[:]) self.sigLoadingStarted.emit(paths[:])
...@@ -280,14 +378,16 @@ class PixelLoader(QObject): ...@@ -280,14 +378,16 @@ class PixelLoader(QObject):
self.nMax = l self.nMax = l
self.nFailed = 0 self.nFailed = 0
#split number of files into list #split tasks into workpackages to be solve per parallel process
_pathsAndIndices = zip(paths, bandIndices)
workPackages = list() workPackages = list()
i = 0 i = 0
while(len(_pathsAndIndices)) > 0: _tasks = tasks[:]
while(len(_tasks)) > 0:
if len(workPackages) <= i: if len(workPackages) <= i:
workPackages.append([]) workPackages.append([])
workPackages[i].append(_pathsAndIndices.pop(0)) task = _tasks.pop(0)
task._jobId = self.jobid
workPackages[i].append(task)
i = i + 1 if i < self.nProcesses - 1 else 0 i = i + 1 if i < self.nProcesses - 1 else 0
from multiprocessing.pool import Pool from multiprocessing.pool import Pool
...@@ -303,14 +403,25 @@ class PixelLoader(QObject): ...@@ -303,14 +403,25 @@ class PixelLoader(QObject):
#print('theGeometryWKT: '+theGeometry.crs().toWkt()) #print('theGeometryWKT: '+theGeometry.crs().toWkt())
for i, workPackage in enumerate(workPackages): for i, workPackage in enumerate(workPackages):
args = (workPackage, self.jobid, i, theGeometry, self.resultQueue, self.cancelEvent) #args = (workPackage, self.jobid, i, theGeometries, self.resultQueue, self.cancelEvent)
kwds = {'profileID':profileID} # kwds = {'profileID':profileID}
#set workpackage / thread-specific internal metdata
for t in workPackage:
assert isinstance(t, PixelLoaderTask)
t.__processId = i
args = (workPackage, self.resultQueue, self.cancelEvent)
kwds = {}
if DEBUG: if DEBUG:
self.checkQueue(loadProfiles(*args, **kwds)) self.checkQueue(loadProfiles(*args, **kwds))
else: else:
r = self.pool.apply_async(loadProfiles, args=args, callback=self.checkQueue, **kwds) r = self.pool.apply_async(loadProfiles, args=args, callback=self.checkQueue, **kwds)
self.mAsyncResults.append(r) self.mAsyncResults.append(r)
if not DEBUG: if not DEBUG:
self.pool.close() self.pool.close()
...@@ -345,12 +456,12 @@ class PixelLoader(QObject): ...@@ -345,12 +456,12 @@ class PixelLoader(QObject):
if __name__ == '__main__': if __name__ == '__main__':
from timeseriesviewer.sandbox import initQgisEnvironment from timeseriesviewer.utils import initQgisApplication
qgsApp = initQgisEnvironment() qgsApp = initQgisApplication()
from PyQt4.QtGui import * from PyQt4.QtGui import *
gb = QGroupBox() gb = QGroupBox()
gb.setTitle('Sandbox') gb.setTitle('Sandbox')
DEBUG = True
PL = PixelLoader() PL = PixelLoader()
PL.setNumberOfProcesses(1) PL.setNumberOfProcesses(1)
...@@ -361,19 +472,27 @@ if __name__ == '__main__': ...@@ -361,19 +472,27 @@ if __name__ == '__main__':
files = [example.Images.Img_2014_05_07_LC82270652014127LGN00_BOA] files = [example.Images.Img_2014_05_07_LC82270652014127LGN00_BOA]
files.extend(file_search(dir, 're_*.tif')) files.extend(file_search(dir, 're_*.tif'))
ext = SpatialExtent.fromRasterSource(files[0]) ext = SpatialExtent.fromRasterSource(files[0])
pt = SpatialPoint(ext.crs(), ext.center())
from qgis.core import QgsPoint
x,y = ext.center()
geoms = [SpatialExtent(ext.crs(),x,y,x+250, y+70 ),
SpatialPoint(ext.crs(), x,y),
SpatialPoint(ext.crs(), x+250, y+70)]
from multiprocessing import Pool from multiprocessing import Pool
def onPxLoaded(*args): def onPxLoaded(*args):
n, nmax, plr = args n, nmax, task = args
assert isinstance(plr, PixelLoaderResult) assert isinstance(task, PixelLoaderTask)
print(task)
PL = PixelLoader() PL = PixelLoader()
def onDummy(*args): def onDummy(*args):
print(('dummy',args)) print(('dummy',args))
pass
s =""
def onTimer(*args): def onTimer(*args):
print(('TIMER',PL)) print(('TIMER',PL))
pass pass
...@@ -382,9 +501,15 @@ if __name__ == '__main__': ...@@ -382,9 +501,15 @@ if __name__ == '__main__':
PL.sigLoadingCanceled.connect(lambda: onDummy('canceled')) PL.sigLoadingCanceled.connect(lambda: onDummy('canceled'))
PL.sigLoadingStarted.connect(lambda: onDummy('started')) PL.sigLoadingStarted.connect(lambda: onDummy('started'))
PL.sigPixelLoaded.connect(lambda : onDummy('px loaded')) PL.sigPixelLoaded.connect(lambda : onDummy('px loaded'))
PL.startLoading(files, pt)
QTimer.singleShot(2000, lambda : PL.cancelLoading()) tasks = []
for i, f in enumerate(files):
kwargs = {'myid':'myID{}'.format(i)}
tasks.append(PixelLoaderTask(f, geoms, bandIndices=None, **kwargs))
PL.startLoading(tasks)
#QTimer.singleShot(2000, lambda : PL.cancelLoading())
qgsApp.exec_() qgsApp.exec_()
s = "" s = ""
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment