Skip to content
Snippets Groups Projects
csvdata.py 11 KiB
Newer Older
  • Learn to ignore specific revisions
  • # -*- coding: utf-8 -*-
    #!/usr/bin/env python3
    # noinspection PyPep8Naming
    """
    ***************************************************************************
        csv.py
        Reading and writing spectral profiles from CSV data
        ---------------------
        Date                 : Okt 2018
        Copyright            : (C) 2018 by Benjamin Jakimow
        Email                : benjamin.jakimow@geo.hu-berlin.de
    ***************************************************************************
    *                                                                         *
    *   This file is part of the EnMAP-Box.                                   *
    *                                                                         *
    *   The EnMAP-Box is free software; you can redistribute it and/or modify *
    *   it under the terms of the GNU General Public License as published by  *
    *   the Free Software Foundation; either version 3 of the License, or     *
    *   (at your option) any later version.                                   *
    *                                                                         *
    *   The EnMAP-Box is distributed in the hope that it will be useful,      *
    *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
    *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the          *
    *   GNU General Public License for more details.                          *
    *                                                                         *
    *   You should have received a copy of the GNU General Public License     *
    *   along with the EnMAP-Box. If not, see <http://www.gnu.org/licenses/>. *
    *                                                                         *
    ***************************************************************************
    """
    import os, sys, re, pathlib
    import csv as pycsv
    from .spectrallibraries import *
    
    class CSVSpectralLibraryIO(AbstractSpectralLibraryIO):
        """
        SpectralLibrary IO with CSV files.
        """
        STD_NAMES = ['WKT']+[n for n in createStandardFields().names()]
        REGEX_HEADERLINE = re.compile('^'+'\\t'.join(STD_NAMES)+'\\t.*')
        REGEX_BANDVALUE_COLUMN = re.compile(r'^(?P<bandprefix>\D+)?(?P<band>\d+)[ _]*(?P<xvalue>-?\d+\.?\d*)?[ _]*(?P<xunit>\D+)?', re.IGNORECASE)
    
        @staticmethod
        def canRead(path=None):
            if not isinstance(path, str):
                return False
    
            found = False
            try:
                with open(path, 'r', encoding='utf-8') as f:
                    for line in f:
                        if CSVSpectralLibraryIO.REGEX_HEADERLINE.search(line):
                            found = True
                            break
            except Exception as ex:
                return False
            return found
    
        @staticmethod
        def write(speclib, path, dialect=pycsv.excel_tab):
            assert isinstance(speclib, SpectralLibrary)
    
            text = CSVSpectralLibraryIO.asString(speclib, dialect=dialect)
            file = open(path, 'w')
            file.write(text)
            file.close()
            return [path]
    
        @staticmethod
        def readFrom(path=None, dialect=pycsv.excel_tab):
            f = open(path, 'r', encoding='utf-8')
            text = f.read()
            f.close()
    
            return CSVSpectralLibraryIO.fromString(text, dialect=dialect)
    
        @staticmethod
        def fromString(text:str, dialect=pycsv.excel_tab):
            # divide the text into blocks of CSV rows with same columns structure
            lines = text.splitlines(keepends=True)
            blocks = []
            currentBlock = ''
            for line in lines:
                assert isinstance(line, str)
                if len(line.strip()) == 0:
                    continue
                if CSVSpectralLibraryIO.REGEX_HEADERLINE.search(line):
                    if len(currentBlock) > 1:
                        blocks.append(currentBlock)
    
                    #start new block
                    currentBlock = line
                else:
                    currentBlock += line
            if len(currentBlock) > 1:
                blocks.append(currentBlock)
            if len(blocks) == 0:
                return None
    
            SLIB = SpectralLibrary()
            SLIB.startEditing()
    
            #read and add CSV blocks
            for block in blocks:
                R = pycsv.DictReader(block.splitlines(), dialect=dialect)
    
                #read entire CSV table
                columnVectors = {}
                for n in R.fieldnames:
                    columnVectors[n] = []
    
                nProfiles = 0
                for i, row in enumerate(R):
                    for k, v in row.items():
                        columnVectors[k].append(v)
                    nProfiles += 1
    
                #find missing fields, detect data type for and them to the SpectralLibrary
    
                bandValueColumnNames = sorted([n for n in R.fieldnames
                                           if CSVSpectralLibraryIO.REGEX_BANDVALUE_COLUMN.match(n)])
                specialHandlingColumns = bandValueColumnNames + ['WKT']
                addGeometry = 'WKT' in R.fieldnames
                addYValues = False
                xUnit = None
                x = []
    
                if len(bandValueColumnNames) > 0:
                    addYValues = True
                    for n in bandValueColumnNames:
                        match = CSVSpectralLibraryIO.REGEX_BANDVALUE_COLUMN.match(n)
                        xValue = match.group('xvalue')
                        if xUnit == None:
                            # extract unit from first columns that defines one
                            xUnit = match.group('xunit')
                        if xValue:
                            t = findTypeFromString(xValue)
                            x.append(toType(t, xValue))
    
    
    
                if len(x) > 0 and not len(x) == len(bandValueColumnNames):
                    print('Inconsistant band value column names. Unable to extract xValues (e.g. wavelength)', file=sys.stderr)
                    x = None
                elif len(x) == 0:
                    x = None
                missingQgsFields = []
    
                #find data type of missing fields
                for n in R.fieldnames:
                    assert isinstance(n, str)
                    if n in specialHandlingColumns:
                        continue
    
                    #find a none-empty string which describes a
                    #data value, get the type for and convert all str values into
                    values = columnVectors[n]
    
                    t = str
                    v = ''
                    for v in values:
                        if len(v) > 0:
                            t = findTypeFromString(v)
                            v = toType(t, v)
                            break
                    qgsField = createQgsField(n, v)
                    if n in bandValueColumnNames:
                        s = ""
    
                    #convert values to int, float or str
                    columnVectors[n] = toType(t, values, empty2None=True)
                    missingQgsFields.append(qgsField)
    
                #add missing fields
                if len(missingQgsFields) > 0:
                    SLIB.addMissingFields(missingQgsFields)
    
    
                #create a feature for each row
                yValueType = None
                for i in range(nProfiles):
                    p = SpectralProfile(fields=SLIB.fields())
                    if addGeometry:
                        g = QgsGeometry.fromWkt(columnVectors['WKT'][i])
                        p.setGeometry(g)
    
                    if addYValues:
                        y = [columnVectors[n][i] for n in bandValueColumnNames]
                        if yValueType is None and len(y) > 0:
                            yValueType = findTypeFromString(y[0])
    
                        y = toType(yValueType, y, True)
                        p.setValues(y=y, x=x, xUnit=xUnit)
    
                    #add other attributes
                    for n in [n for n in p.fieldNames() if n in list(columnVectors.keys())]:
    
                        p.setAttribute(n, columnVectors[n][i])
    
                    SLIB.addFeature(p)
    
    
            SLIB.commitChanges()
            return SLIB
    
    
        @staticmethod
        def asString(speclib, dialect=pycsv.excel_tab, skipValues=False, skipGeometry=False):
    
            assert isinstance(speclib, SpectralLibrary)
    
            attributeNames = [n for n in speclib.fieldNames()]
    
            stream = io.StringIO()
            for i, item in enumerate(speclib.groupBySpectralProperties().items()):
    
                xvalues, xunit, yunit = item[0]
                profiles = item[1]
                assert isinstance(profiles, list)
                attributeNames = attributeNames[:]
    
                valueNames = []
                for b, xvalue in enumerate(xvalues):
    
                    name = 'b{}'.format(b+1)
    
                    suffix = ''
                    if xunit is not None:
                        suffix+=str(xvalue)
                        suffix += xunit
                    elif xvalue != b:
                        suffix += str(xvalue)
    
                    if len(suffix)>0:
                        name += '_'+suffix
                    valueNames.append(name)
    
                fieldnames = []
                if not skipGeometry:
                    fieldnames += ['WKT']
                fieldnames += attributeNames
                if not skipGeometry:
                    fieldnames += valueNames
    
                W = pycsv.DictWriter(stream, fieldnames=fieldnames, dialect=dialect)
    
    
                W.writeheader()
    
                for p in profiles:
                    assert isinstance(p, SpectralProfile)
                    D = dict()
    
                    if not skipGeometry:
                        D['WKT'] = p.geometry().asWkt()
    
                    for n in attributeNames:
                        D[n] = value2str(p.attribute(n))
    
                    if not skipValues:
                        for i, yValue in enumerate(p.yValues()):
                            D[valueNames[i]] = yValue
    
                    W.writerow(D)
                W.writerow({}) #append empty row
    
    
            return stream.getvalue()
    
    
    
    class CSVWriterFieldValueConverter(QgsVectorFileWriter.FieldValueConverter):
        """
        A QgsVectorFileWriter.FieldValueConverter to convers SpectralLibrary values into strings
        """
        def __init__(self, speclib):
            super(CSVWriterFieldValueConverter, self).__init__()
            self.mSpeclib = speclib
            self.mNames = self.mSpeclib.fields().names()
            self.mCharactersToReplace = '\t'
            self.mReplacement = ' '
    
        def setSeparatorCharactersToReplace(self, charactersToReplace, replacement:str= ' '):
            """
            Specifies characters that need to be masked in string, i.e. the separator, to not violate the CSV structure.
            :param charactersToReplace: str | list of strings
            :param replacement: str, Tabulator by default
            """
            if isinstance(charactersToReplace, str):
                charactersToReplace = [charactersToReplace]
            assert replacement not in charactersToReplace
            self.mCharactersToReplace = charactersToReplace
            self.mReplacement = replacement
    
        def clone(self):
            c = CSVWriterFieldValueConverter(self.mSpeclib)
            c.setSeparatorCharactersToReplace(self.mCharactersToReplace, replacement=self.mReplacement)
            return c
    
        def convert(self, i, value):
            name = self.mNames[i]
            if name.startswith(HIDDEN_ATTRIBUTE_PREFIX):
                return str(pickle.loads(value))
            else:
    
                v = str(value)
                for c in self.mCharactersToReplace:
                    v = v.replace(c, self.mReplacement)
                return v
    
        def fieldDefinition(self, field):
            return field