Source code for tradssat.tmpl.file

import os
import re

import numpy as np

from tradssat.utils import detect_encod
from .vals import FileValueSet, ValueSubSection
from .var import VariableSet, CODE_MISS


[docs]class File(object): """ Parent class for all file objects. """ def __init__(self, file): """ Parameters ---------- file: str """ self.file = file self._var_info = VariableSet(self._get_var_info()) self._values = FileValueSet() self.encoding = detect_encod(self.file) self._read() def _read(self): with open(self.file, encoding=self.encoding) as f: section = [] # To store lines that go in the same section for l in f.readlines(): if l[0] == '!': # skip comments continue if l[0] == '*': # start of section # Process any previously stored block if section: self._read_section(section) # Clear the current block section.clear() if l.strip(): section.append(l) # Append current line to block # Read the last block too self._read_section(section) def get_var_type(self, var, sect=None): return self.get_var(var, sect).type_ def get_var_lims(self, var, sect=None): return self.get_var(var, sect).lims def get_var_spc(self, var, sect=None, **kwargs): return self.get_var(var, sect).spc
[docs] def get_var_size(self, var, sect=None): """ Returns the size of a variable. Parameters ---------- var: str The name of the variable. sect: str The name of the section in which this variable appears (optional; for ambiguous cases where a file has several variables with the same code). Returns ------- int The size of the variable. """ return self.get_var(var, sect).size
def get_var_code_miss(self, var, sect=None): return self.get_var(var, sect).miss def get_var(self, var, sect=None): return self._var_info.get_var(var, sect)
[docs] def get_value(self, var, sect=None, subsect=None, cond=None): """ Parameters ---------- var sect subsect cond Returns ------- np.ndarray """ return self._values.get_value(var, sect=sect, subsect=subsect, cond=cond)
def get_dims_val(self, var): return self.get_value(var).shape def add_row(self, sect, subsect=None, vals=None): self._values.add_row(sect, subsect, vals) def remove_row(self, sect, subsect=None, cond=None): self._values.remove_row(sect, subsect, cond) def find_var_sect(self, var): return self._values.find_var_sect(var) def variables(self): return list(str(vr) for vr in self._var_info.variables()) def to_dict(self): return self._values.to_dict() def _read_subsection(self, section_name, subblock): var_names = self._get_var_names(subblock[0]) n_lines = len(subblock) - 1 # -1 for the header line (with "@" ) lengths = [self.get_var_size(vr) for vr in var_names] spaces = [self.get_var_spc(var = vr, header = subblock[0]) for vr in var_names] cum_lens = np.insert(np.cumsum(lengths) + np.cumsum(spaces), 0, 0) cutoffs = [(cum_lens[i], cum_lens[i + 1]) for i in range(len(var_names))] d_vals = {vr: self._gen_empty_mtrx(vr, n_lines) for vr in var_names} for i, l in enumerate(subblock[1:]): # Odd workaround necessary because several cultivar names in DSSAT are larger than the allowed space # and so run into the next column, which apparently isn't supposed to matter if the next column's value # is small enough to allow both to fit. (Really?!) vals = [ (l[0 if c[0] == 0 else max(c[0], l.find(' ', c[0], c[1] - 1)): None if l.find(' ', c[1] - 1) < 0 else l.find(' ', c[1] - 1)]).strip() for c in cutoffs] for vr, vl in zip(var_names, vals): if not vl: vl = self.get_var_code_miss(vr) d_vals[vr][i] = vl l_vars = [self._var_info.get_var(vr, sect=section_name) for vr in var_names] l_vals = [d_vals[vr] for vr in var_names] subsect = ValueSubSection(l_vars, l_vals) self._values[section_name].add_subsection(subsect) def _read_section(self, section): section_name, section_lines = self._process_section_header(section) subblock = [] for l in section_lines: # skip first line (with "*") if l[0] == '@': if subblock: self._read_subsection(section_name, subblock) subblock.clear() # Append current line to section if l.strip().strip('\x1a'): # '\x1a' needed for obscure character DSSAT likes to append to .SNX/SQX subblock.append(l) if subblock: self._read_subsection(section_name, subblock) def _gen_empty_mtrx(self, var, size): tp = self.get_var_type(var) if tp == 'float': dtype = float elif tp == 'int': dtype = int elif tp == 'str' or tp == str: str_size = self.get_var_size(var) dtype = 'U{}'.format(str_size + 5) # +5 just to be safe (with DSSAT input files you never know) else: dtype = tp return np.full(size, CODE_MISS, dtype=dtype) def _get_var_names(self, line): var_names = [str(vr) for vr in self._var_info] var_names.sort(key=len, reverse=True) def _strip(txt): return re.sub('^[|.\W]+', '', txt) final_names = [] line = _strip(line[1:]) # skip initial "@" while line: try: name = next(vr for vr in var_names if line.startswith(vr)) except StopIteration: raise ValueError( 'No variable matching "{line}" for file {nm}.'.format( line=line[:20], nm=os.path.split(self.file)[1] ) ) final_names.append(name) line = _strip(line[len(name):]) return final_names def __contains__(self, item): return item in self._values def _get_var_info(self): """ Return a set of variables. Returns ------- set[Variable] """ raise NotImplementedError
[docs] @classmethod def matches_file(cls, file): """ Checks whether a given file can be read by this class. Must be implemented in subclasses. Parameters ---------- file: str The file to be read. Returns ------- bool ``True`` if the file matches; ``False`` otherwise. """ raise NotImplementedError
def _process_section_header(self, lines): """ Parameters ---------- lines Returns ------- tuple[str, list] """ raise NotImplementedError