Newer
Older
# -*- coding: utf-8 -*-
"""
/***************************************************************************

Benjamin Jakimow
committed
EO Time Series Viewer
-------------------
begin : 2015-08-20
git sha : $Format:%H$
copyright : (C) 2017 by HU-Berlin
email : benjamin.jakimow@geo.hu-berlin.de
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
# noinspection PyPep8Naming
import sys, re, collections, traceback, time, json, urllib, types

Benjamin Jakimow
committed
import bisect
from qgis import *
from qgis.core import *
from qgis.gui import *

Benjamin Jakimow
committed
from qgis.PyQt.QtGui import *
from qgis.PyQt.QtWidgets import *
from qgis.PyQt.QtCore import *

Benjamin Jakimow
committed
from osgeo import gdal
from timeseriesviewer.dateparser import DOYfromDatetime64
from timeseriesviewer.utils import SpatialExtent, loadUI, px2geo

benjamin.jakimow@geo.hu-berlin.de
committed
gdal.SetConfigOption('VRT_SHARED_SOURCE', '0') #!important. really. do not change this.

Benjamin Jakimow
committed
from timeseriesviewer import SETTINGS, messageLog
from timeseriesviewer.dateparser import parseDateFromDataSet
def transformGeometry(geom, crsSrc, crsDst, trans=None):
if trans is None:
assert isinstance(crsSrc, QgsCoordinateReferenceSystem)
assert isinstance(crsDst, QgsCoordinateReferenceSystem)
return transformGeometry(geom, None, None, trans=QgsCoordinateTransform(crsSrc, crsDst))
else:
assert isinstance(trans, QgsCoordinateTransform)
return trans.transform(geom)
GDAL_DATATYPES = {}
for var in vars(gdal):
match = re.search(r'^GDT_(?P<type>.*)$', var)
if match:
number = getattr(gdal, var)
GDAL_DATATYPES[match.group('type')] = number
GDAL_DATATYPES[match.group()] = number
"nm": -9, "um": -6, "mm": -3, "cm": -2, "dm": -1, "m": 0, "hm": 2, "km": 3
}
#add synonyms
METRIC_EXPONENTS['nanometers'] = METRIC_EXPONENTS['nm']
METRIC_EXPONENTS['micrometers'] = METRIC_EXPONENTS['um']
METRIC_EXPONENTS['millimeters'] = METRIC_EXPONENTS['mm']
METRIC_EXPONENTS['centimeters'] = METRIC_EXPONENTS['cm']
METRIC_EXPONENTS['decimeters'] = METRIC_EXPONENTS['dm']
METRIC_EXPONENTS['meters'] = METRIC_EXPONENTS['m']
METRIC_EXPONENTS['hectometers'] = METRIC_EXPONENTS['hm']
METRIC_EXPONENTS['kilometers'] = METRIC_EXPONENTS['km']
def convertMetricUnit(value, u1, u2):
assert u1 in METRIC_EXPONENTS.keys()
assert u2 in METRIC_EXPONENTS.keys()
e1 = METRIC_EXPONENTS[u1]
e2 = METRIC_EXPONENTS[u2]
return value * 10**(e1-e2)
def getDS(pathOrDataset)->gdal.Dataset:
"""
Returns a gdal.Dataset
:param pathOrDataset: str | gdal.Dataset | QgsRasterLayer
:return:
"""
if isinstance(pathOrDataset, QgsRasterLayer):
return getDS(pathOrDataset.source())
elif isinstance(pathOrDataset, gdal.Dataset):
return pathOrDataset
ds = gdal.Open(pathOrDataset)
assert isinstance(ds, gdal.Dataset)
return ds
def sensorID(nb:int, px_size_x:float, px_size_y:float, dt:int, wl:list, wlu:str)->str:
"""
Create a sensor ID
:param nb: number of bands
:param px_size_x: pixel size x
:param px_size_y: pixel size y
:param wl: list of wavelength
:param wlu: str, wavelength unit
:return: str
"""
assert dt in GDAL_DATATYPES.values()
assert isinstance(nb, int) and nb > 0
assert isinstance(px_size_x, (int, float)) and px_size_x > 0
assert isinstance(px_size_y, (int, float)) and px_size_y > 0
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
if wl != None:
assert isinstance(wl, list)
assert len(wl) == nb
if wlu != None:
assert isinstance(wlu, str)
return json.dumps((nb, px_size_x, px_size_y, dt, wl, wlu))
def sensorIDtoProperties(idString:str)->tuple:
"""
Reads a sensor id string and returns the sensor properties. See sensorID().
:param idString: str
:return: (ns, px_size_x, px_size_y, [wl], wlu)
"""
nb, px_size_x, px_size_y, dt, wl, wlu = json.loads(idString)
assert isinstance(dt, int) and dt >= 0
assert isinstance(nb, int)
assert isinstance(px_size_x, (int,float)) and px_size_x > 0
assert isinstance(px_size_y, (int, float)) and px_size_y > 0
if wl != None:
assert isinstance(wl, list)
if wlu != None:
assert isinstance(wlu, str)
return nb, px_size_x, px_size_y, dt, wl, wlu
class SensorInstrument(QObject):
"""
Describes a Sensor Configuration
"""
SensorNameSettingsPrefix = 'SensorName.'
sigNameChanged = pyqtSignal(str)
LUT_Wavelengths = dict({'B':480,
'G':570,
'R':660,
'nIR':850,
'swIR':1650,
'swIR1':1650,
'swIR2':2150
})
def __init__(self, sid:str, sensor_name:str=None, band_names:list = None):
super(SensorInstrument, self).__init__()
self.nb, self.px_size_x, self.px_size_y, self.dataType, self.wl, self.wlu = sensorIDtoProperties(self.mId)
if not isinstance(band_names, list):
band_names = ['Band {}'.format(b+1) for b in range(self.nb)]
assert len(band_names) == self.nb
self.bandNames = band_names
self.wlu = self.wlu
if self.wl is None:
self.wl = None
if sensor_name is None:
sensor_name = '{}bands@{}m'.format(self.nb, self.px_size_x)
sensor_name = SETTINGS.value(self._sensorSettingsKey(), sensor_name)
self.setName(sensor_name)
from .utils import TestObjects
import uuid
path = '/vsimem/mockupImage.{}.bsq'.format(uuid.uuid4())
self.mMockupDS = TestObjects.inMemoryImage(path=path, nb=self.nb, eType=self.dataType, ns=2, nl=2)
self.mMockupLayer = QgsRasterLayer(self.mMockupDS.GetFileList()[0])
#create an in-memory data set
return self.mMockupLayer
def _sensorSettingsKey(self):
return SensorInstrument.SensorNameSettingsPrefix+self.mId
if name != self.mName:
self.mName = name
SETTINGS.setValue(self._sensorSettingsKey(), name)
self.sigNameChanged.emit(self.name())

Benjamin Jakimow
committed
if not isinstance(other, SensorInstrument):
return False
return hash(self.id())
return str(self.__class__) +' ' + self.name()
"""
Returns a human-readable description
:return: str
"""
info.append(self.name())
info.append('{} Bands'.format(self.nb))
info.append('Band\tName\tWavelength')
for b in range(self.nb):
else:
wl = 'unknown'
info.append('{}\t{}\t{}'.format(b + 1, self.bandNames[b], wl))
return '\n'.join(info)
def verifyInputImage(datasource):
"""
Checks if an image source can be uses as TimeSeriesDatum, i.e. if it can be read by gdal.Open() and
if we can extract an observation date as numpy.datetime64.
:param datasource: str with data source uri or gdal.Dataset
:return: bool
"""

benjamin.jakimow@geo.hu-berlin.de
committed
if isinstance(datasource, str):
datasource = gdal.Open(datasource)
if not isinstance(datasource, gdal.Dataset):

benjamin.jakimow@geo.hu-berlin.de
committed
if datasource.RasterCount == 0 and len(datasource.GetSubDatasets()) > 0:
#logger.error('Can not open container {}.\nPlease specify a subdataset'.format(path))
if datasource.GetDriver().ShortName == 'VRT':
files = datasource.GetFileList()
if len(files) > 0:
for f in files:
subDS = gdal.Open(f)
if not isinstance(subDS, gdal.Dataset):
return False
from timeseriesviewer.dateparser import parseDateFromDataSet
date = parseDateFromDataSet(datasource)

benjamin.jakimow@geo.hu-berlin.de
committed
return True
class TimeSeriesSource(object):
"""Provides some information on source images"""
Reads the argument and returns a TimeSeriesSource
:param source: gdal.Dataset, str, QgsRasterLayer
:return: TimeSeriesSource
ds = None
if isinstance(source, QgsRasterLayer):
lyr = source
provider = lyr.providerType()
if provider == 'gdal':
ds = gdal.Open(lyr.source())
elif provider == 'wcs':
parts = urllib.parse.parse_qs(lyr.source())
url = re.search(r'^[^?]+', parts['url'][0]).group()
identifier = re.search(r'^[^?]+', parts['identifier'][0]).group()
uri2 = 'WCS:{}?coverage={}'.format(url, identifier)
ds = gdal.Open(uri2)
if not isinstance(ds, gdal.Dataset) or ds.RasterCount == 0:
dsGetCoverage = gdal.Open('WCS:{}'.format(url))
for subdatasetUrl, id in dsGetCoverage.GetSubDatasets():
if id == identifier:
ds = gdal.Open(subdatasetUrl)
break
raise Exception('Unsupported raster data provider: {}'.format(provider))
elif isinstance(source, str):
ds = gdal.Open(source)
elif isinstance(source, gdal.Dataset):
ds = source
else:
raise Exception('Unsupported source: {}'.format(source))
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def __init__(self, dataset:gdal.Dataset):
assert isinstance(dataset, gdal.Dataset)
assert dataset.RasterCount > 0
assert dataset.RasterYSize > 0
assert dataset.RasterXSize > 0
self.mUri = dataset.GetFileList()[0]
self.mDate = parseDateFromDataSet(dataset)
assert self.mDate is not None, 'Unable to find acquisition date of {}'.format(self.mUri)
self.mDrv = dataset.GetDriver().ShortName
self.mGT = dataset.GetGeoTransform()
self.mWKT = dataset.GetProjection()
self.mCRS = QgsCoordinateReferenceSystem(self.mWKT)
self.mWL, self.mWLU = extractWavelengths(dataset)
self.nb, self.nl, self.ns = dataset.RasterCount, dataset.RasterYSize, dataset.RasterXSize
self.mGeoTransform = dataset.GetGeoTransform()
px_x = float(abs(self.mGeoTransform[1]))
px_y = float(abs(self.mGeoTransform[5]))
self.mGSD = (px_x, px_y)
self.mDataType = dataset.GetRasterBand(1).DataType
self.mSid = sensorID(self.nb, px_x, px_y, self.mDataType, self.mWL, self.mWLU)
self.mMetaData = collections.OrderedDict()
for domain in dataset.GetMetadataDomainList():
self.mMetaData[domain] = dataset.GetMetadata_Dict(domain)
self.mUL = QgsPointXY(*px2geo(QPoint(0, 0), self.mGeoTransform, pxCenter=False))
self.mLR = QgsPointXY(*px2geo(QPoint(self.ns + 1, self.nl + 1), self.mGeoTransform, pxCenter=False))
def name(self)->str:
"""
Returns a name for this data source
:return:
"""
bn = os.path.basename(self.uri())
return '{} {}'.format(bn, self.date())
def uri(self)->str:
"""
URI that can be used with GDAL to open a dataset
:return: str
"""
return self.mUri
def qgsMimeDataUtilsUri(self)->QgsMimeDataUtils.Uri:
uri = QgsMimeDataUtils.Uri()
uri.name = self.name()
uri.providerKey = 'gdal'
uri.uri = self.uri()
uri.layerType = 'raster'
return uri
def sid(self)->str:
"""
Returns the sensor id
:return: str
"""
return self.mSid
def date(self)->np.datetime64:
return self.mDate
def crs(self)->QgsCoordinateReferenceSystem:
return self.mCRS
def spatialExtent(self)->SpatialExtent:
return SpatialExtent(self.mCRS, self.mUL, self.mLR)
def __eq__(self, other):
if not isinstance(other, TimeSeriesSource):
return False
return self.mUri == other.mUri
sigVisibilityChanged = pyqtSignal(bool)
sigRemoveMe = pyqtSignal()
sigSourcesChanged = pyqtSignal()
def __init__(self, timeSeries, date:np.datetime64, sensor:SensorInstrument):
"""
Constructor
:param timeSeries: TimeSeries, parent TimeSeries instance, optional
:param date: np.datetime64,
:param sensor: SensorInstrument
"""
super(TimeSeriesDatum,self).__init__()
assert isinstance(date, np.datetime64)
assert isinstance(sensor, SensorInstrument)
self.mSensor = sensor
self.mDate = date
self.mDOY = DOYfromDatetime64(self.mDate)
self.mSources = []
self.mMasks = []
def addSource(self, source):
"""
Adds an time series source to this TimeSeriesDatum
:param path: TimeSeriesSource or any argument accepted by TimeSeriesSource.create()
:return: TimeSeriesSource, if added
"""
if not isinstance(source, TimeSeriesSource):
return self.addSource(TimeSeriesSource.create(source))
else:
assert isinstance(source, TimeSeriesSource)
assert self.mDate == source.date()
assert self.mSensor.id() == source.sid()
if source not in self.mSources:
self.mSources.append(source)
self.sigSourcesChanged.emit()
return source
else:
return None
def setVisibility(self, b:bool):
"""
Sets the visibility of the TimeSeriesDatum, i.e. whether linked MapCanvases will be shown to the user
:param b: bool
"""
old = self.mVisibility
self.mVisibility = b
if old != self.mVisibility:
self.sigVisibilityChanged.emit(b)
def isVisible(self):
"""
Returns whether the TimeSeriesDatum is visible as MapCanvas
:return: bool
"""
def sensor(self)->SensorInstrument:
"""
Returns the SensorInstrument
:return: SensorInsturment
"""
return self.mSensor
Returns the source images
:return: [list-of-TimeSeriesSource]
def sourceUris(self)->list:
"""
Returns all source URIs as list of strings-
:return: [list-of-str]
"""
return [tss.uri() for tss in self.sources()]
def qgsMimeDataUtilsUris(self)->list:
"""
Returns all source URIs as list of QgsMimeDataUtils.Uri
:return: [list-of-QgsMimedataUtils.Uris]
"""
return [s.qgsMimeDataUtilsUri() for s in self.sources()]
Returns the observation date
:return: numpy.datetime64
def doy(self)->int:
"""
Returns the day of Year (DOY)
:return: int
"""
return int(self.mDOY)
ext = None
for i, tss in enumerate(self.sources()):
assert isinstance(tss, TimeSeriesSource)
if i == 0:
ext = tss.spatialExtent()
else:
ext.combineExtentWith(tss.spatialExtent())
return ext
def imageBorders(self)->QgsGeometry:
"""
Retunrs the exact border polygon
:return: QgsGeometry
"""
return None
def __repr__(self):
return 'TimeSeriesDatum({},{})'.format(self.mDate, str(self.mSensor))
def __eq__(self, other):
return self.mDate == other.date and self.mSensor.id() == other.sensor.id()
def __len__(self):
return len(self.mSources)
assert isinstance(other, TimeSeriesDatum)
if self.date() < other.date():

benjamin.jakimow@geo.hu-berlin.de
committed
return False
return self.sensor().id() < other.sensor().id()
def id(self):
"""
:return:
"""
return (self.mDate, self.mSensor.id())
def mimeDataUris(self)->list:
"""
Returns the sources of this TSD as list of QgsMimeDataUtils.Uris
:return: [list-of-QgsMimeDataUtils]
"""
results = []
for tss in self.sources():
assert isinstance(tss, TimeSeriesSource)
[tss.uri() for tss in self.sources()]
def __hash__(self):
class TimeSeriesTableView(QTableView):
def __init__(self, parent=None):
super(TimeSeriesTableView, self).__init__(parent)
def contextMenuEvent(self, event):
"""
Creates and shows an QMenu
:param event:
"""

benjamin.jakimow@geo.hu-berlin.de
committed
a = menu.addAction('Copy value(s)')
a.triggered.connect(self.onCopyValues)
a = menu.addAction('Check')
a.triggered.connect(lambda : self.onSetCheckState(Qt.Checked))
a = menu.addAction('Uncheck')
a.triggered.connect(lambda: self.onSetCheckState(Qt.Unchecked))

benjamin.jakimow@geo.hu-berlin.de
committed
def onSetCheckState(self, checkState):
"""
Sets a ChecState to all selected rows
:param checkState: Qt.CheckState
"""

benjamin.jakimow@geo.hu-berlin.de
committed
indices = self.selectionModel().selectedIndexes()
rows = sorted(list(set([i.row() for i in indices])))
model = self.model()
if isinstance(model, TimeSeriesTableModel):
for r in rows:
idx = model.index(r,0)
model.setData(idx, checkState, Qt.CheckStateRole)
def onCopyValues(self):
"""
Copies selected cell values to the clipboard
"""

benjamin.jakimow@geo.hu-berlin.de
committed
indices = self.selectionModel().selectedIndexes()
model = self.model()
if isinstance(model, TimeSeriesTableModel):
from collections import OrderedDict
R = OrderedDict()
for idx in indices:
if not idx.row() in R.keys():
R[idx.row()] = []
R[idx.row()].append(model.data(idx, Qt.DisplayRole))
info = []
for k, values in R.items():
info.append(';'.join([str(v) for v in values]))
info = '\n'.join(info)
QApplication.clipboard().setText(info)
s = ""
class TimeSeriesDockUI(QgsDockWidget, loadUI('timeseriesdock.ui')):
"""
QgsDockWidget that shows the TimeSeries
"""
def __init__(self, parent=None):
super(TimeSeriesDockUI, self).__init__(parent)
self.setupUi(self)
self.btnAddTSD.setDefaultAction(parent.actionAddTSD)
self.btnRemoveTSD.setDefaultAction(parent.actionRemoveTSD)
self.btnLoadTS.setDefaultAction(parent.actionLoadTS)
self.btnSaveTS.setDefaultAction(parent.actionSaveTS)
self.btnClearTS.setDefaultAction(parent.actionClearTS)
self.progressBar.setMinimum(0)
self.setProgressInfo(0,100, 'Add images to fill time series')
self.progressBar.setValue(0)
self.progressInfo.setText(None)
self.frameFilters.setVisible(False)
self.setTimeSeries(None)
"""
Updates the status of the TimeSeries
"""
from timeseriesviewer.timeseries import TimeSeries
if isinstance(self.TS, TimeSeries):
ndates = len(self.TS)
nsensors = len(set([tsd.sensor for tsd in self.TS]))
msg = '{} scene(s) from {} sensor(s)'.format(ndates, nsensors)
if ndates > 1:
msg += ', {} to {}'.format(str(self.TS[0].date), str(self.TS[-1].date))
self.progressInfo.setText(msg)
def setProgressInfo(self, nDone:int, nMax:int, message=None):
"""
Sets the progress bar of the TimeSeriesDockUI
:param nDone: number of added data sources
:param nMax: total number of data source to be added
:param message: error / other kind of info message
"""
if self.progressBar.maximum() != nMax:
self.progressBar.setMaximum(nMax)
self.progressBar.setValue(nDone)
self.progressInfo.setText(message)
QgsApplication.processEvents()
if nDone == nMax:
QTimer.singleShot(3000, lambda: self.setStatus())
def onSelectionChanged(self, *args):
"""
Slot to react on user-driven changes of the selected TimeSeriesDatum rows.
"""
self.btnRemoveTSD.setEnabled(self.SM is not None and len(self.SM.selectedRows()) > 0)
def selectedTimeSeriesDates(self):
"""
Returns the TimeSeriesDatum selected by a user.
:return: [list-of-TimeSeriesDatum]
"""
return [self.mTSModel.data(idx, Qt.UserRole) for idx in self.SM.selectedRows()]
def setTimeSeries(self, TS):
"""
Sets the TimeSeries to be shown in the TimeSeriesDockUI
:param TS: TimeSeries
"""
from timeseriesviewer.timeseries import TimeSeries
self.TS = TS
self.SM = None
self.timeSeriesInitialized = False
if isinstance(TS, TimeSeries):
from timeseriesviewer.timeseries import TimeSeriesTableModel
self.mTSModel = TimeSeriesTableModel(self.TS)
self.mTSProxyModel = QSortFilterProxyModel(self)
self.mTSProxyModel.setSourceModel(self.mTSModel)
self.tableView_TimeSeries.setModel(self.mTSProxyModel)
self.SM = QItemSelectionModel(self.mTSProxyModel)
self.tableView_TimeSeries.setSelectionModel(self.SM)
self.SM.selectionChanged.connect(self.onSelectionChanged)
self.tableView_TimeSeries.horizontalHeader().setResizeMode(QHeaderView.ResizeToContents)
TS.sigLoadingProgress.connect(self.setProgressInfo)
self.onSelectionChanged()
"""
The sorted list of data sources that specify the time series
"""
sigTimeSeriesDatesAdded = pyqtSignal(list)
sigTimeSeriesDatesRemoved = pyqtSignal(list)
sigLoadingProgress = pyqtSignal(int, int, str)
sigSensorAdded = pyqtSignal(SensorInstrument)
sigSensorRemoved = pyqtSignal(SensorInstrument)
sigRuntimeStats = pyqtSignal(dict)
def __init__(self, imageFiles=None, maskFiles=None):
QObject.__init__(self)
self.mTSDs = list()
self.mSensors = []
self.mShape = None
if maskFiles is not None:
self.addMasks(maskFiles)
_sep = ';'
def sensor(self, sid:str)->SensorInstrument:
"""
Returns the sensor with sid = sid
:param sid: str, sensor id
:return: SensorInstrument
assert isinstance(sid, str)
for sensor in self.mSensors:
assert isinstance(sensor, SensorInstrument)
if sensor.id() == sid:
return sensor
return None
def sensors(self)->list:
"""
Returns the list of sensors derived from the TimeSeries data sources
:return: [list-of-SensorInstruments]
"""
"""
Loads a CSV file with source images of a TimeSeries
:param path: str, Path of CSV file
:param n_max: optional, maximum number of files to load
"""
images = []
masks = []
with open(path, 'r') as f:
lines = f.readlines()
for l in lines:
if re.match('^[ ]*[;#&]', l):
continue
parts = re.split('[\n'+TimeSeries._sep+']', l)
parts = [p for p in parts if p != '']
images.append(parts[0])
if len(parts) > 1:
masks.append(parts[1])
"""
Saves the TimeSeries sources into a CSV file
:param path: str, path of CSV file
:return: path of CSV file
"""
lines = []
lines.append('#Time series definition file: {}'.format(np.datetime64('now').astype(str)))
line = TSD.pathImg
lines.append(line)
lines = [l+'\n' for l in lines]
messageLog('Time series source images written to {}'.format(path))
"""
Returns the pixel sizes of all SensorInstruments
:return: [list-of-QgsRectangles]
"""

benjamin.jakimow@geo.hu-berlin.de
committed
r = []

benjamin.jakimow@geo.hu-berlin.de
committed
r.append((QgsRectangle(sensor.px_size_x, sensor.px_size_y)))
return r
"""
Returns the maximum SpatialExtent of all images of the TimeSeries
:param crs: QgsCoordinateSystem to express the SpatialExtent coordinates.
:return:
"""
extent = None
for i, tsd in enumerate(self.mTSDs):
assert isinstance(tsd, TimeSeriesDatum)
ext = tsd.spatialExtent()
if isinstance(extent, SpatialExtent):
extent = extent.combineExtentWith(ext)
else:
extent = ext

benjamin.jakimow@geo.hu-berlin.de
committed
def getTSD(self, pathOfInterest):
"""
Returns the TimeSeriesDatum related to an image source
:param pathOfInterest: str, image source uri
:return: TimeSeriesDatum
"""
for tsd in self.mTSDs:
assert isinstance(tsd, TimeSeriesDatum)
if pathOfInterest in tsd.pathImg:

benjamin.jakimow@geo.hu-berlin.de
committed
return tsd
return None
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
def tsd(self, date:np.datetime64, sensor)->TimeSeriesDatum:
"""
Returns the TimeSeriesDatum identified by ate nd sensorID
:param date:
:param sensor: SensorInstrument | str with sensor id
:return:
"""
assert isinstance(date, np.datetime64)
if isinstance(sensor, str):
sensor = self.sensor(sensor)
if isinstance(sensor, SensorInstrument):
for tsd in self.mTSDs:
if tsd.date() == date and tsd.sensor() == sensor:
return tsd
return None
def insertTSD(self, tsd:TimeSeriesDatum)->TimeSeriesDatum:
"""
Inserts a TimeSeriesDatum
:param tsd: TimeSeriesDatum
"""
#insert sorted by time & sensor
assert tsd not in self.mTSDs
assert tsd.sensor() in self.mSensors
bisect.insort(self.mTSDs, tsd)
tsd.mTimeSeries = self
tsd.sigRemoveMe.connect(lambda: self.removeTSDs([tsd]))
tsd.sigSourcesChanged.connect(lambda: self.sigSourcesChanged.emit(tsd))
return tsd
def removeTSDs(self, tsds):
"""
Removes a list of TimeSeriesDatum
:param tsds: [list-of-TimeSeriesDatum]
"""
removed = list()
for tsd in tsds:
assert isinstance(tsd, TimeSeriesDatum)
assert tsd in self.mTSDs
self.mTSDs.remove(tsd)
tsd.mTimeSeries = None
removed.append(tsd)
self.sigTimeSeriesDatesRemoved.emit(removed)
def tsds(self, date:np.datetime64=None, sensor:SensorInstrument=None)->list:
"""
Returns a list of TimeSeriesDatum of the TimeSeries. By default all TimeSeriesDatum will be returned.
:param date: numpy.datetime64 to return the TimeSeriesDatum for
:param sensor: SensorInstrument of interest to return the [list-of-TimeSeriesDatum] for.
:return: [list-of-TimeSeriesDatum]
"""
tsds = self.mTSDs[:]
if date:
tsds = [tsd for tsd in tsds if tsd.date() == date]
if sensor:
tsds = [tsd for tsd in tsds if tsd.sensor == sensor]
"""
Removes all data sources from the TimeSeries (which will be empty after calling this routine).
"""
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
if not sensor in self.mSensors:
self.mSensors.append(sensor)
self.sigSensorAdded.emit(sensor)
return sensor
else:
return None
def checkSensorList(self):
"""
Removes sensors without linked TSD / no data
"""
to_remove = []
for sensor in self.sensors():
tsds = [tsd for tsd in self.mTSDs if tsd.sensor() == sensor]
if len(tsds) == 0:
to_remove.append(sensor)
for sensor in to_remove:
self.removeSensor(sensor)
def removeSensor(self, sensor:SensorInstrument)->SensorInstrument:
"""
Removes a sensor and all linked images
:param sensor: SensorInstrument
:return: SensorInstrument or none, if sensor was not defined in the TimeSeries
"""
assert isinstance(sensor, SensorInstrument)
if sensor in self.mSensors:
tsds = [tsd for tsd in self.mTSDs if tsd.sensor() == sensor]
self.removeTSDs(tsds)
self.mSensors.remove(sensor)
self.sigSensorRemoved.emit(sensor)
return sensor
return None