Skip to content

RawDataReader API

AeroViz.rawDataReader.RawDataReader(instrument, path, reset=False, qc=True, start=None, end=None, mean_freq='1h', size_range=None, suppress_warnings=False, log_level='INFO', **kwargs)

Factory function to instantiate the appropriate reader module for a given instrument and return the processed data over the specified time range.

Parameters:

Name Type Description Default
instrument str

The instrument name for which to read data, must be a valid key in the meta dictionary

required
path Path or str

The directory where raw data files for the instrument are stored

required
reset bool or str

Data processing control mode: False (default) - Use existing processed data if available True - Force reprocess all data from raw files 'append' - Add new data to existing processed data

False
qc bool or str

Quality control and rate calculation mode: True (default) - Apply QC and calculate overall rates False - Skip QC and return raw data only str - Calculate rates at specified intervals: 'W' - Weekly rates 'MS' - Month start rates 'QS' - Quarter start rates 'YS' - Year start rates Can add number prefix (e.g., '2MS' for bi-monthly)

True
start datetime

Start time for filtering the data

None
end datetime

End time for filtering the data

None
mean_freq str

Resampling frequency for averaging the data (e.g., '1h' for hourly mean)

'1h'
size_range tuple[float, float]

Size range in nanometers (min_size, max_size) for SMPS/APS data filtering

None
suppress_warnings bool

Whether to suppress warning messages (default: False)

False
log_level (DEBUG, INFO, WARNING, ERROR)

Logging level (default: 'INFO')

'DEBUG'
**kwargs

Additional arguments to pass to the reader module

{}

Returns:

Type Description
DataFrame

Processed data with specified QC and time range

Raises:

Type Description
ValueError

If QC mode or mean_freq format is invalid

TypeError

If parameters are of incorrect type

KeyError

If instrument name is not found in the supported instruments list

FileNotFoundError

If path does not exist or cannot be accessed

Examples:

>>> from AeroViz import RawDataReader
>>>
>>> # Using string inputs
>>> df_ae33 = RawDataReader(
...     instrument='AE33',
...     path='/path/to/your/data/folder',
...     reset=True,
...     qc='1MS',
...     start='2024-01-01',
...     end='2024-06-30',
...     mean_freq='1h',
... )
>>> # Using Path and datetime objects
>>> from pathlib import Path
>>> from datetime import datetime
>>>
>>> df_ae33 = RawDataReader(
...     instrument='AE33',
...     path=Path('/path/to/your/data/folder'),
...     reset=True,
...     qc='1MS',
...     start=datetime(2024, 1, 1),
...     end=datetime(2024, 6, 30),
...     mean_freq='1h',
... )
Source code in AeroViz/rawDataReader/__init__.py
def RawDataReader(instrument: str,
                  path: Path | str,
                  reset: bool | str = False,
                  qc: bool | str = True,
                  start: datetime | str = None,
                  end: datetime | str = None,
                  mean_freq: str = '1h',
                  size_range: tuple[float, float] | None = None,
                  suppress_warnings: bool = False,
                  log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR'] = 'INFO',
                  **kwargs):
    """
    Factory function to instantiate the appropriate reader module for a given instrument and
    return the processed data over the specified time range.

    Parameters
    ----------
    instrument : str
        The instrument name for which to read data, must be a valid key in the meta dictionary

    path : Path or str
        The directory where raw data files for the instrument are stored

    reset : bool or str
        Data processing control mode:
        False (default) - Use existing processed data if available
        True - Force reprocess all data from raw files
        'append' - Add new data to existing processed data

    qc : bool or str
        Quality control and rate calculation mode:
        True (default) - Apply QC and calculate overall rates
        False - Skip QC and return raw data only
        str - Calculate rates at specified intervals:
            'W' - Weekly rates
            'MS' - Month start rates
            'QS' - Quarter start rates
            'YS' - Year start rates
            Can add number prefix (e.g., '2MS' for bi-monthly)

    start : datetime
        Start time for filtering the data

    end : datetime
        End time for filtering the data

    mean_freq : str
        Resampling frequency for averaging the data (e.g., '1h' for hourly mean)

    size_range : tuple[float, float], optional
        Size range in nanometers (min_size, max_size) for SMPS/APS data filtering

    suppress_warnings : bool, optional
        Whether to suppress warning messages (default: False)

    log_level : {'DEBUG', 'INFO', 'WARNING', 'ERROR'}
        Logging level (default: 'INFO')

    **kwargs
        Additional arguments to pass to the reader module

    Returns
    -------
    pd.DataFrame
        Processed data with specified QC and time range

    Raises
    ------
    ValueError
        If QC mode or mean_freq format is invalid
    TypeError
        If parameters are of incorrect type
    KeyError
        If instrument name is not found in the supported instruments list
    FileNotFoundError
        If path does not exist or cannot be accessed

    Examples
    --------
    >>> from AeroViz import RawDataReader
    >>>
    >>> # Using string inputs
    >>> df_ae33 = RawDataReader(
    ...     instrument='AE33',
    ...     path='/path/to/your/data/folder',
    ...     reset=True,
    ...     qc='1MS',
    ...     start='2024-01-01',
    ...     end='2024-06-30',
    ...     mean_freq='1h',
    ... )

    >>> # Using Path and datetime objects
    >>> from pathlib import Path
    >>> from datetime import datetime
    >>>
    >>> df_ae33 = RawDataReader(
    ...     instrument='AE33',
    ...     path=Path('/path/to/your/data/folder'),
    ...     reset=True,
    ...     qc='1MS',
    ...     start=datetime(2024, 1, 1),
    ...     end=datetime(2024, 6, 30),
    ...     mean_freq='1h',
    ... )
    """

    # Mapping of instrument names to their respective classes
    instrument_class_map = {cls.__name__.split('.')[-1]: cls for cls in SUPPORTED_INSTRUMENTS}

    # Check if the instrument name is in the map
    if instrument not in meta.keys():
        raise KeyError(f"Instrument name '{instrument}' is not valid. \nMust be one of: {list(meta.keys())}")

    # Check if path exists and is a directory
    if not isinstance(path, Path):
        path = Path(path)
    if not path.exists() or not path.is_dir():
        raise FileNotFoundError(f"The specified path '{path}' does not exist or is not a directory.")

    # Validate the QC frequency
    if isinstance(qc, str):
        try:
            Grouper(freq=qc)
        except (ValueError, TypeError):
            raise ValueError(f"Invalid frequency: {qc}. Must be one of: "
                             f"W (week), MS (month start), QS (quarter start), YS (year start)")

    # Convert and verify input times
    if not (start and end):
        raise ValueError("Both start and end times must be provided.")

    # Convert start time if it's a string
    if isinstance(start, str):
        try:
            start = datetime.fromisoformat(start.replace('Z', '+00:00'))
        except ValueError as e:
            raise ValueError(
                f"Invalid start time format. Please use ISO format (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS): {e}")

    # Convert end time if it's a string
    if isinstance(end, str):
        try:
            end = datetime.fromisoformat(end.replace('Z', '+00:00'))
        except ValueError as e:
            raise ValueError(
                f"Invalid end time format. Please use ISO format (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS): {e}")

    if end <= start:
        raise ValueError(f"Invalid time range: start {start} is after end {end}")

    end = end.replace(hour=23, minute=59, second=59) if end.hour == 0 and end.minute == 0 else end

    # Verify that mean_freq format
    try:
        Timedelta(mean_freq)
    except ValueError:
        raise ValueError(
            f"Invalid mean_freq: '{mean_freq}'. It should be a valid frequency string (e.g., '1h', '30min', '1D').")

    # Validate size range
    if size_range is not None:
        if instrument not in SIZE_RANGE_INSTRUMENTS:
            raise ValueError(f"Size range filtering is only supported for {SIZE_RANGE_INSTRUMENTS}")

        min_size, max_size = size_range
        if not isinstance(min_size, (int, float)) or not isinstance(max_size, (int, float)):
            raise ValueError("Size range values must be numeric")
        if min_size >= max_size:
            raise ValueError("Minimum size must be less than maximum size")

        if instrument == 'SMPS':
            if not (1 <= min_size <= 1000) or not (1 <= max_size <= 1000):
                raise ValueError("SMPS size range must be between 1 and 1000 nm")
        elif instrument == 'APS':
            if not (500 <= min_size <= 20000) or not (500 <= max_size <= 20000):
                raise ValueError("APS size range must be between 500 and 20000 nm")

        kwargs.update({'size_range': size_range})

    kwargs.update({
        'suppress_warnings': suppress_warnings,
        'log_level': log_level
    })

    # Instantiate the class and return the instance
    reader_module = instrument_class_map[instrument].Reader(
        path=path,
        reset=reset,
        qc=qc,
        **kwargs
    )
    return reader_module(
        start=start,
        end=end,
        mean_freq=mean_freq,
    )

AbstractReader API

AeroViz.rawDataReader.core.AbstractReader

Bases: ABC

Abstract class for reading raw data from different instruments. Each instrument should have a separate class that inherits from this class and implements the abstract methods. The abstract methods are _raw_reader and _QC.

List the file in the path and read pickle file if it exists, else read raw data and dump the pickle file the pickle file will be generated after read raw data first time, if you want to re-read the rawdata, please set 'reset=True'

Source code in AeroViz/rawDataReader/core/__init__.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
class AbstractReader(ABC):
    """
    Abstract class for reading raw data from different instruments. Each instrument should have a separate class that
    inherits from this class and implements the abstract methods. The abstract methods are `_raw_reader` and `_QC`.

    List the file in the path and read pickle file if it exists, else read raw data and dump the pickle file the
    pickle file will be generated after read raw data first time, if you want to re-read the rawdata, please set
    'reset=True'
    """

    nam = 'AbstractReader'

    def __init__(self,
                 path: Path | str,
                 reset: bool | str = False,
                 qc: bool | str = True,
                 **kwargs):
        """A core initialized method for reading raw data from different instruments.

        Parameters
        ----------
        path : str | Path
            The path of the raw data file.
        reset : bool | str
            Whether to reset the raw data before reading.
        qc : bool | str
            Whether to read QC data before reading.
        **kwargs : dict
            Additional keyword arguments passed to the reader.

        """
        self.path = Path(path)
        self.meta = meta[self.nam]
        output_folder = self.path / f'{self.nam.lower()}_outputs'
        output_folder.mkdir(parents=True, exist_ok=True)

        self.logger = ReaderLogger(
            self.nam, output_folder,
            kwargs.get('log_level').upper() if not kwargs.get('suppress_warnings') else 'ERROR')

        self.reset = reset is True
        self.append = reset == 'append'
        self.qc = qc  # if qc, then calculate rate
        self.qc_freq = qc if isinstance(qc, str) else None
        self.kwargs = kwargs

        self.pkl_nam = output_folder / f'_read_{self.nam.lower()}.pkl'
        self.csv_nam = output_folder / f'_read_{self.nam.lower()}.csv'
        self.pkl_nam_raw = output_folder / f'_read_{self.nam.lower()}_raw.pkl'
        self.csv_nam_raw = output_folder / f'_read_{self.nam.lower()}_raw.csv'
        self.csv_out = output_folder / f'output_{self.nam.lower()}.csv'
        self.report_out = output_folder / 'report.json'

    def __call__(self,
                 start: datetime,
                 end: datetime,
                 mean_freq: str = '1h',
                 ) -> pd.DataFrame:
        """Process data for specified time range."""

        data = self._run(start, end)

        if data is not None:
            data = data.resample(mean_freq).mean()

        data.to_csv(self.csv_out)

        return data

    @abstractmethod
    def _raw_reader(self, file):
        """Implement in child classes to read raw data files."""
        pass

    @abstractmethod
    def _QC(self, df: pd.DataFrame) -> pd.DataFrame:
        """Implement in child classes for quality control."""
        return df

    def __calculate_rates(self, raw_data, qc_data, all_keys=False, with_log=False):
        """Calculate acquisition rate, yield rate, and total rate.

        Parameters
        ----------
        raw_data : DataFrame
            Raw data before quality control
        qc_data : DataFrame
            Data after quality control
        all_keys : bool, default=False
            Whether to calculate rates for all deterministic keys
        with_log : bool, default=False
            Whether to output calculation logs

        Returns
        -------
        dict
            Dictionary containing calculated rates
        """
        if raw_data.empty or qc_data.empty:
            return {'acquisition_rate': 0, 'yield_rate': 0, 'total_rate': 0}

        def _calculate_single_key(key_name, key_columns):
            columns, drop_how = (qc_data.keys(), 'all') if key_columns == ['all'] else (key_columns, 'any')

            # 重採樣並計算有效數據量
            period_size = len(raw_data.resample('1h').mean().index)
            sample_size = len(raw_data[columns].resample('1h').mean().dropna(how=drop_how).index)
            qc_size = len(qc_data[columns].resample('1h').mean().dropna(how=drop_how).index)

            # 驗證計算
            if any([
                period_size == 0 or sample_size == 0 or qc_size == 0,
                period_size < sample_size,
                sample_size < qc_size
            ]):
                if with_log:
                    self.logger.warning(f"\t\t No data for this period... skip")
                return None

            # 計算比率
            sample_rate = round((sample_size / period_size) * 100, 1)
            valid_rate = round((qc_size / sample_size) * 100, 1)
            total_rate = round((qc_size / period_size) * 100, 1)

            if with_log:
                self.logger.info(f"\t\t> {key_name}")
                self.logger.info(
                    f"\t\t\t> {'Sample Rate':13}: {self.logger.BLUE}{sample_rate:>6.1f}%{self.logger.RESET}")
                self.logger.info(
                    f"\t\t\t> {'Valid  Rate':13}: {self.logger.BLUE}{valid_rate:>6.1f}%{self.logger.RESET}")
                self.logger.info(
                    f"\t\t\t> {'Total  Rate':13}: {self.logger.BLUE}{total_rate:>6.1f}%{self.logger.RESET}")

            return {
                'acquisition_rate': sample_rate,
                'yield_rate': valid_rate,
                'total_rate': total_rate
            }

        if all_keys:
            # 計算所有 key 並回傳所有結果(用於日誌輸出)
            all_results = []
            for name, columns in self.meta['deter_key'].items():
                result = _calculate_single_key(name, columns)
                if result:
                    all_results.append(result)

            if not all_results:
                return {'acquisition_rate': 0, 'yield_rate': 0, 'total_rate': 0}

            # 回傳所有結果中比率最低的
            return {
                'acquisition_rate': min(r['acquisition_rate'] for r in all_results),
                'yield_rate': min(r['yield_rate'] for r in all_results),
                'total_rate': min(r['total_rate'] for r in all_results)
            }
        else:
            # 計算所有 key 但只回傳最低的比率
            min_rates = {'acquisition_rate': 200, 'yield_rate': 200, 'total_rate': 200}

            for name, columns in self.meta['deter_key'].items():
                result = _calculate_single_key(name, columns)
                if result:
                    min_rates['acquisition_rate'] = min(min_rates['acquisition_rate'], result['acquisition_rate'])
                    min_rates['yield_rate'] = min(min_rates['yield_rate'], result['yield_rate'])
                    min_rates['total_rate'] = min(min_rates['total_rate'], result['total_rate'])

            # 如果沒有任何有效結果,回傳 0
            if min_rates['acquisition_rate'] == 200 and min_rates['yield_rate'] == 200:
                return {'acquisition_rate': 0, 'yield_rate': 0, 'total_rate': 0}

            return min_rates

    def _rate_calculate(self, raw_data, qc_data) -> None:
        if self.meta['deter_key'] is not None:
            if self.qc_freq is not None:
                raw_data_grouped = raw_data.groupby(pd.Grouper(freq=self.qc_freq))
                qc_data_grouped = qc_data.groupby(pd.Grouper(freq=self.qc_freq))

                for (month, _sub_raw_data), (_, _sub_qc_data) in zip(raw_data_grouped, qc_data_grouped):
                    self.logger.info(
                        f"\t{self.logger.BLUE}> Processing: {_sub_raw_data.index[0].strftime('%F')}"
                        f" to {_sub_raw_data.index[-1].strftime('%F')}{self.logger.RESET}")

                    self.__calculate_rates(_sub_raw_data, _sub_qc_data, all_keys=True, with_log=True)
            else:
                self.__calculate_rates(raw_data, qc_data, all_keys=True, with_log=True)

            # 使用 Grouper 對數據按週和月進行分組
            current_time = datetime.now()

            # 按週分組 (使用星期一作為每週的開始)
            weekly_raw_groups = raw_data.groupby(pd.Grouper(freq='W-MON', label="left", closed="left"))
            weekly_qc_groups = qc_data.groupby(pd.Grouper(freq='W-MON', label="left", closed="left"))

            # for name, sub_df in weekly_raw_groups:
            #     print(f"\n週次開始日期: {name}")
            #     print(f"該週資料範圍: {sub_df.index.min()} to {sub_df.index.max()}")

            # 按月分組 (使用月初作為每月的開始)
            monthly_raw_groups = raw_data.groupby(pd.Grouper(freq='MS'))
            monthly_qc_groups = qc_data.groupby(pd.Grouper(freq='MS'))

            # 生成報告
            self.__generate_grouped_report(
                current_time,
                weekly_raw_groups, weekly_qc_groups,
                monthly_raw_groups, monthly_qc_groups
            )

    def __generate_grouped_report(self, current_time, weekly_raw_groups, weekly_qc_groups,
                                  monthly_raw_groups, monthly_qc_groups):
        """Generate acquisition and yield reports based on grouped data"""
        report = {
            "report_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
            "instrument_info": {
                "station": self.path.name[:2],
                "instrument": self.nam
            },
            "rates": {
                "weekly": {},
                "monthly": {}
            }
        }

        # 處理週數據 - 使用標準週時間範圍
        for week_start, week_raw_data in weekly_raw_groups:
            # 獲取對應的QC數據
            week_qc_data = weekly_qc_groups.get_group(
                week_start) if week_start in weekly_qc_groups.groups else pd.DataFrame()

            if not week_raw_data.empty:
                # 計算標準週結束時間(週日23:59:59)
                week_end = week_start + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59)

                # 使用週的開始日期作為鍵
                period_key = week_start.strftime('%Y-%m-%d')

                report["rates"]["weekly"][period_key] = {
                    "start_time": week_start.strftime('%Y-%m-%d %H:%M:%S'),
                    "end_time": week_end.strftime('%Y-%m-%d %H:%M:%S'),
                    "rates": self.__calculate_rates(week_raw_data, week_qc_data)
                }

        # 處理月數據 - 使用標準月時間範圍
        for month_start, month_raw_data in monthly_raw_groups:
            # 獲取對應的QC數據
            month_qc_data = monthly_qc_groups.get_group(
                month_start) if month_start in monthly_qc_groups.groups else pd.DataFrame()

            if not month_raw_data.empty:
                # 計算標準月結束時間(月末23:59:59)
                next_month_start = (month_start + pd.Timedelta(days=32)).replace(day=1)
                month_end = next_month_start - pd.Timedelta(seconds=1)

                # 使用月份作為鍵
                period_key = month_start.strftime('%Y-%m')

                report["rates"]["monthly"][period_key] = {
                    "start_time": month_start.strftime('%Y-%m-%d %H:%M:%S'),
                    "end_time": month_end.strftime('%Y-%m-%d %H:%M:%S'),
                    "rates": self.__calculate_rates(month_raw_data, month_qc_data)
                }

        # 寫入報告
        with open(self.report_out, 'w') as f:
            json.dump(report, f, indent=4)

    def _timeIndex_process(self, _df, user_start=None, user_end=None, append_df=None):
        """Process time index, resample data, extract specified time range, and optionally append new data.

        Parameters
        ----------
        _df : pandas.DataFrame
            Input DataFrame with time index
        user_start : datetime or str, optional
            Start of user-specified time range
        user_end : datetime or str, optional
            End of user-specified time range
        append_df : pandas.DataFrame, optional
            DataFrame to append to the result

        Returns
        -------
        pandas.DataFrame
            Processed DataFrame with properly formatted time index
        """
        # Round timestamps and remove duplicates
        _df = _df.groupby(_df.index.floor('1min')).first()

        # Determine frequency
        freq = _df.index.inferred_freq or self.meta['freq']

        # Append new data if provided
        if append_df is not None:
            append_df.index = append_df.index.round('1min')
            _df = pd.concat([append_df.dropna(how='all'), _df.dropna(how='all')])
            _df = _df.loc[~_df.index.duplicated()]

        # Determine time range
        df_start, df_end = _df.index.sort_values()[[0, -1]]

        # Create new time index
        new_index = pd.date_range(user_start or df_start, user_end or df_end, freq=freq, name='time')

        # Process data: convert to numeric, resample, and reindex with controlled tolerance
        if freq in ['1min', 'min', 'T']:
            # 對於分鐘級數據,使用較小的tolerance,例如30秒
            return _df.reindex(new_index, method='nearest', tolerance='30s')
        elif freq in ['1h', 'h', 'H']:
            # 對於小時級數據,使用30分鐘作為tolerance
            # 這樣08:20會匹配到08:00,但不會匹配到09:00
            return _df.reindex(new_index, method='nearest', tolerance='30min')
        else:
            # 對於其他頻率,tolerance設置為頻率的一半
            if isinstance(freq, str) and freq[-1].isalpha():
                # 如果freq格式為'數字+單位',例如'2h','3min'
                try:
                    num = int(freq[:-1])
                    unit = freq[-1]
                    half_freq = f"{num // 2}{unit}" if num > 1 else f"30{'min' if unit == 'h' else 's'}"
                    return _df.reindex(new_index, method='nearest', tolerance=half_freq)
                except ValueError:
                    # 無法解析freq,使用默認值
                    return _df.reindex(new_index, method='nearest', tolerance=freq)
            else:
                return _df.reindex(new_index, method='nearest', tolerance=freq)

    def _outlier_process(self, _df):
        """Process outliers."""
        outlier_file = self.path / 'outlier.json'

        if not outlier_file.exists():
            return _df

        with outlier_file.open('r', encoding='utf-8', errors='ignore') as f:
            outliers = json.load(f)

        for _st, _ed in outliers.values():
            _df.loc[_st:_ed] = np.nan

        return _df

    def _save_data(self, raw_data: pd.DataFrame, qc_data: pd.DataFrame) -> None:
        """Save data to files."""
        try:
            raw_data.to_pickle(self.pkl_nam_raw)
            raw_data.to_csv(self.csv_nam_raw)

            if self.meta['deter_key'] is not None:
                qc_data.to_pickle(self.pkl_nam)
                qc_data.to_csv(self.csv_nam)

        except Exception as e:
            raise IOError(f"Error saving data. {e}")

    @contextmanager
    def progress_reading(self, files: list) -> Generator:
        # Create message temporary storage and replace logger method
        logs = {level: [] for level in ['info', 'warning', 'error']}
        original = {level: getattr(self.logger, level) for level in logs}

        for level, msgs in logs.items():
            setattr(self.logger, level, msgs.append)

        try:
            with Progress(
                    SpinnerColumn(finished_text="✓"),
                    BarColumn(bar_width=25, complete_style="green", finished_style="bright_green"),
                    TaskProgressColumn(style="bold", text_format="[bright_green]{task.percentage:>3.0f}%"),
                    TextColumn("{task.description}", style="bold blue"),
                    TextColumn("{task.fields[filename]}", style="bold blue"),
                    console=Console(force_terminal=True, color_system="auto", width=120),
                    expand=False
            ) as progress:
                task = progress.add_task(f"Reading {self.nam} files:", total=len(files), filename="")
                yield progress, task
        finally:
            # Restore logger method and output message
            for level, msgs in logs.items():
                setattr(self.logger, level, original[level])
                for msg in msgs:
                    original[level](msg)

    def _read_raw_files(self) -> tuple[pd.DataFrame | None, pd.DataFrame | None]:
        """ **Read and process raw files.** """
        files = [f
                 for file_pattern in self.meta['pattern']
                 for pattern in {file_pattern.lower(), file_pattern.upper(), file_pattern}
                 for f in self.path.glob(pattern)
                 if f.name not in [self.csv_out.name, self.csv_nam.name, self.csv_nam_raw.name, f'{self.nam}.log']]

        if not files:
            raise FileNotFoundError(f"No files in '{self.path}' could be read. Please check the current path.")

        df_list = []

        # Context manager for progress bar display
        with self.progress_reading(files) as (progress, task):
            for file in files:
                progress.update(task, advance=1, filename=file.name)
                try:
                    if (df := self._raw_reader(file)) is not None and not df.empty:
                        df_list.append(df)
                    else:
                        self.logger.debug(f"\tFile {file.name} produced an empty DataFrame or None.")

                except Exception as e:
                    self.logger.error(f"Error reading {file.name}: {e}")

        if not df_list:
            raise ValueError(f"\033[41m\033[97mAll files were either empty or failed to read.\033[0m")

        raw_data = pd.concat(df_list, axis=0).groupby(level=0).first()

        if self.nam in ['SMPS', 'APS', 'GRIMM']:
            raw_data = raw_data.sort_index(axis=1, key=lambda x: x.astype(float))

        raw_data = self._timeIndex_process(raw_data)

        raw_data = raw_data.apply(pd.to_numeric, errors='coerce').copy(deep=True)
        qc_data = self._QC(raw_data).apply(pd.to_numeric, errors='coerce').copy(deep=True)

        return raw_data, qc_data

    def _run(self, user_start, user_end):
        # read pickle if pickle file exists and 'reset=False' or process raw data or append new data
        if self.pkl_nam_raw.exists() and self.pkl_nam.exists() and not self.reset:
            self.logger.info_box(f"Reading {self.nam} PICKLE from {user_start} to {user_end}")

            _f_raw_done, _f_qc_done = pd.read_pickle(self.pkl_nam_raw), pd.read_pickle(self.pkl_nam)

            if self.append:
                self.logger.info_box(f"Appending New data from {user_start} to {user_end}")

                _f_raw_new, _f_qc_new = self._read_raw_files()
                _f_raw = self._timeIndex_process(_f_raw_done, append_df=_f_raw_new)
                _f_qc = self._timeIndex_process(_f_qc_done, append_df=_f_qc_new)

            else:
                _f_raw, _f_qc = _f_raw_done, _f_qc_done

                return _f_qc if self.qc else _f_raw

        else:
            self.logger.info_box(f"Reading {self.nam} RAW DATA from {user_start} to {user_end}")

            _f_raw, _f_qc = self._read_raw_files()

        # process time index
        _f_raw = self._timeIndex_process(_f_raw, user_start, user_end)
        _f_qc = self._timeIndex_process(_f_qc, user_start, user_end)
        _f_qc = self._outlier_process(_f_qc)

        # save
        self._save_data(_f_raw, _f_qc)

        if self.qc:
            self._rate_calculate(_f_raw.apply(pd.to_numeric, errors='coerce'),
                                 _f_qc.apply(pd.to_numeric, errors='coerce'))

        return _f_qc if self.qc else _f_raw

    @staticmethod
    def reorder_dataframe_columns(df, order_lists: list[list], keep_others: bool = False):
        """Reorder DataFrame columns."""
        new_order = []

        for order in order_lists:
            # Only add column that exist in the DataFrame and do not add them repeatedly
            new_order.extend([col for col in order if col in df.columns and col not in new_order])

        if keep_others:
            # Add all original fields not in the new order list, keeping their original order
            new_order.extend([col for col in df.columns if col not in new_order])

        return df[new_order]

    @staticmethod
    def time_aware_IQR_QC(df: pd.DataFrame, time_window='1D', log_dist=False) -> pd.DataFrame:
        return QualityControl().time_aware_iqr(df, time_window=time_window, log_dist=log_dist)

    @staticmethod
    def filter_error_status(_df, error_codes, special_codes=None):
        """Filter data containing specified error status codes and specially handle certain specific codes.

        Parameters
        ----------
        _df : pandas.DataFrame
            A DataFrame containing a 'Status' column
        error_codes : list
            A List of status codes for bitwise testing
        special_codes : list, optional
            List of special status codes for exact matching

        Returns
        -------
        pandas.DataFrame
            Filtered DataFrame

        Notes
        -----
        This function performs two types of filtering:
        1. Bitwise filtering that checks if any error_codes are present in the Status
        2. Exact matching for special_codes
        """
        # Create an empty mask
        error_mask = pd.Series(False, index=_df.index)

        # Convert Status to integer (if it's not already)
        status_values = pd.to_numeric(_df['Status'], errors='coerce').fillna(0).astype(int)

        # Bitwise test normal error codes
        for code in error_codes:
            # Use bitwise operation on the integer-converted status_values
            error_mask = error_mask | ((status_values & code) != 0)

        # Exact matching for special codes
        if special_codes:
            error_mask = error_mask | status_values.isin(special_codes)

        # Mask rows containing any errors
        return _df.mask(error_mask)

__calculate_rates(raw_data, qc_data, all_keys=False, with_log=False)

Calculate acquisition rate, yield rate, and total rate.

Parameters:

Name Type Description Default
raw_data DataFrame

Raw data before quality control

required
qc_data DataFrame

Data after quality control

required
all_keys bool

Whether to calculate rates for all deterministic keys

False
with_log bool

Whether to output calculation logs

False

Returns:

Type Description
dict

Dictionary containing calculated rates

Source code in AeroViz/rawDataReader/core/__init__.py
def __calculate_rates(self, raw_data, qc_data, all_keys=False, with_log=False):
    """Calculate acquisition rate, yield rate, and total rate.

    Parameters
    ----------
    raw_data : DataFrame
        Raw data before quality control
    qc_data : DataFrame
        Data after quality control
    all_keys : bool, default=False
        Whether to calculate rates for all deterministic keys
    with_log : bool, default=False
        Whether to output calculation logs

    Returns
    -------
    dict
        Dictionary containing calculated rates
    """
    if raw_data.empty or qc_data.empty:
        return {'acquisition_rate': 0, 'yield_rate': 0, 'total_rate': 0}

    def _calculate_single_key(key_name, key_columns):
        columns, drop_how = (qc_data.keys(), 'all') if key_columns == ['all'] else (key_columns, 'any')

        # 重採樣並計算有效數據量
        period_size = len(raw_data.resample('1h').mean().index)
        sample_size = len(raw_data[columns].resample('1h').mean().dropna(how=drop_how).index)
        qc_size = len(qc_data[columns].resample('1h').mean().dropna(how=drop_how).index)

        # 驗證計算
        if any([
            period_size == 0 or sample_size == 0 or qc_size == 0,
            period_size < sample_size,
            sample_size < qc_size
        ]):
            if with_log:
                self.logger.warning(f"\t\t No data for this period... skip")
            return None

        # 計算比率
        sample_rate = round((sample_size / period_size) * 100, 1)
        valid_rate = round((qc_size / sample_size) * 100, 1)
        total_rate = round((qc_size / period_size) * 100, 1)

        if with_log:
            self.logger.info(f"\t\t> {key_name}")
            self.logger.info(
                f"\t\t\t> {'Sample Rate':13}: {self.logger.BLUE}{sample_rate:>6.1f}%{self.logger.RESET}")
            self.logger.info(
                f"\t\t\t> {'Valid  Rate':13}: {self.logger.BLUE}{valid_rate:>6.1f}%{self.logger.RESET}")
            self.logger.info(
                f"\t\t\t> {'Total  Rate':13}: {self.logger.BLUE}{total_rate:>6.1f}%{self.logger.RESET}")

        return {
            'acquisition_rate': sample_rate,
            'yield_rate': valid_rate,
            'total_rate': total_rate
        }

    if all_keys:
        # 計算所有 key 並回傳所有結果(用於日誌輸出)
        all_results = []
        for name, columns in self.meta['deter_key'].items():
            result = _calculate_single_key(name, columns)
            if result:
                all_results.append(result)

        if not all_results:
            return {'acquisition_rate': 0, 'yield_rate': 0, 'total_rate': 0}

        # 回傳所有結果中比率最低的
        return {
            'acquisition_rate': min(r['acquisition_rate'] for r in all_results),
            'yield_rate': min(r['yield_rate'] for r in all_results),
            'total_rate': min(r['total_rate'] for r in all_results)
        }
    else:
        # 計算所有 key 但只回傳最低的比率
        min_rates = {'acquisition_rate': 200, 'yield_rate': 200, 'total_rate': 200}

        for name, columns in self.meta['deter_key'].items():
            result = _calculate_single_key(name, columns)
            if result:
                min_rates['acquisition_rate'] = min(min_rates['acquisition_rate'], result['acquisition_rate'])
                min_rates['yield_rate'] = min(min_rates['yield_rate'], result['yield_rate'])
                min_rates['total_rate'] = min(min_rates['total_rate'], result['total_rate'])

        # 如果沒有任何有效結果,回傳 0
        if min_rates['acquisition_rate'] == 200 and min_rates['yield_rate'] == 200:
            return {'acquisition_rate': 0, 'yield_rate': 0, 'total_rate': 0}

        return min_rates

__call__(start, end, mean_freq='1h')

Process data for specified time range.

Source code in AeroViz/rawDataReader/core/__init__.py
def __call__(self,
             start: datetime,
             end: datetime,
             mean_freq: str = '1h',
             ) -> pd.DataFrame:
    """Process data for specified time range."""

    data = self._run(start, end)

    if data is not None:
        data = data.resample(mean_freq).mean()

    data.to_csv(self.csv_out)

    return data

__generate_grouped_report(current_time, weekly_raw_groups, weekly_qc_groups, monthly_raw_groups, monthly_qc_groups)

Generate acquisition and yield reports based on grouped data

Source code in AeroViz/rawDataReader/core/__init__.py
def __generate_grouped_report(self, current_time, weekly_raw_groups, weekly_qc_groups,
                              monthly_raw_groups, monthly_qc_groups):
    """Generate acquisition and yield reports based on grouped data"""
    report = {
        "report_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
        "instrument_info": {
            "station": self.path.name[:2],
            "instrument": self.nam
        },
        "rates": {
            "weekly": {},
            "monthly": {}
        }
    }

    # 處理週數據 - 使用標準週時間範圍
    for week_start, week_raw_data in weekly_raw_groups:
        # 獲取對應的QC數據
        week_qc_data = weekly_qc_groups.get_group(
            week_start) if week_start in weekly_qc_groups.groups else pd.DataFrame()

        if not week_raw_data.empty:
            # 計算標準週結束時間(週日23:59:59)
            week_end = week_start + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59)

            # 使用週的開始日期作為鍵
            period_key = week_start.strftime('%Y-%m-%d')

            report["rates"]["weekly"][period_key] = {
                "start_time": week_start.strftime('%Y-%m-%d %H:%M:%S'),
                "end_time": week_end.strftime('%Y-%m-%d %H:%M:%S'),
                "rates": self.__calculate_rates(week_raw_data, week_qc_data)
            }

    # 處理月數據 - 使用標準月時間範圍
    for month_start, month_raw_data in monthly_raw_groups:
        # 獲取對應的QC數據
        month_qc_data = monthly_qc_groups.get_group(
            month_start) if month_start in monthly_qc_groups.groups else pd.DataFrame()

        if not month_raw_data.empty:
            # 計算標準月結束時間(月末23:59:59)
            next_month_start = (month_start + pd.Timedelta(days=32)).replace(day=1)
            month_end = next_month_start - pd.Timedelta(seconds=1)

            # 使用月份作為鍵
            period_key = month_start.strftime('%Y-%m')

            report["rates"]["monthly"][period_key] = {
                "start_time": month_start.strftime('%Y-%m-%d %H:%M:%S'),
                "end_time": month_end.strftime('%Y-%m-%d %H:%M:%S'),
                "rates": self.__calculate_rates(month_raw_data, month_qc_data)
            }

    # 寫入報告
    with open(self.report_out, 'w') as f:
        json.dump(report, f, indent=4)

__init__(path, reset=False, qc=True, **kwargs)

A core initialized method for reading raw data from different instruments.

Parameters:

Name Type Description Default
path str | Path

The path of the raw data file.

required
reset bool | str

Whether to reset the raw data before reading.

False
qc bool | str

Whether to read QC data before reading.

True
**kwargs dict

Additional keyword arguments passed to the reader.

{}
Source code in AeroViz/rawDataReader/core/__init__.py
def __init__(self,
             path: Path | str,
             reset: bool | str = False,
             qc: bool | str = True,
             **kwargs):
    """A core initialized method for reading raw data from different instruments.

    Parameters
    ----------
    path : str | Path
        The path of the raw data file.
    reset : bool | str
        Whether to reset the raw data before reading.
    qc : bool | str
        Whether to read QC data before reading.
    **kwargs : dict
        Additional keyword arguments passed to the reader.

    """
    self.path = Path(path)
    self.meta = meta[self.nam]
    output_folder = self.path / f'{self.nam.lower()}_outputs'
    output_folder.mkdir(parents=True, exist_ok=True)

    self.logger = ReaderLogger(
        self.nam, output_folder,
        kwargs.get('log_level').upper() if not kwargs.get('suppress_warnings') else 'ERROR')

    self.reset = reset is True
    self.append = reset == 'append'
    self.qc = qc  # if qc, then calculate rate
    self.qc_freq = qc if isinstance(qc, str) else None
    self.kwargs = kwargs

    self.pkl_nam = output_folder / f'_read_{self.nam.lower()}.pkl'
    self.csv_nam = output_folder / f'_read_{self.nam.lower()}.csv'
    self.pkl_nam_raw = output_folder / f'_read_{self.nam.lower()}_raw.pkl'
    self.csv_nam_raw = output_folder / f'_read_{self.nam.lower()}_raw.csv'
    self.csv_out = output_folder / f'output_{self.nam.lower()}.csv'
    self.report_out = output_folder / 'report.json'

filter_error_status(_df, error_codes, special_codes=None) staticmethod

Filter data containing specified error status codes and specially handle certain specific codes.

Parameters:

Name Type Description Default
_df DataFrame

A DataFrame containing a 'Status' column

required
error_codes list

A List of status codes for bitwise testing

required
special_codes list

List of special status codes for exact matching

None

Returns:

Type Description
DataFrame

Filtered DataFrame

Notes

This function performs two types of filtering: 1. Bitwise filtering that checks if any error_codes are present in the Status 2. Exact matching for special_codes

Source code in AeroViz/rawDataReader/core/__init__.py
@staticmethod
def filter_error_status(_df, error_codes, special_codes=None):
    """Filter data containing specified error status codes and specially handle certain specific codes.

    Parameters
    ----------
    _df : pandas.DataFrame
        A DataFrame containing a 'Status' column
    error_codes : list
        A List of status codes for bitwise testing
    special_codes : list, optional
        List of special status codes for exact matching

    Returns
    -------
    pandas.DataFrame
        Filtered DataFrame

    Notes
    -----
    This function performs two types of filtering:
    1. Bitwise filtering that checks if any error_codes are present in the Status
    2. Exact matching for special_codes
    """
    # Create an empty mask
    error_mask = pd.Series(False, index=_df.index)

    # Convert Status to integer (if it's not already)
    status_values = pd.to_numeric(_df['Status'], errors='coerce').fillna(0).astype(int)

    # Bitwise test normal error codes
    for code in error_codes:
        # Use bitwise operation on the integer-converted status_values
        error_mask = error_mask | ((status_values & code) != 0)

    # Exact matching for special codes
    if special_codes:
        error_mask = error_mask | status_values.isin(special_codes)

    # Mask rows containing any errors
    return _df.mask(error_mask)

reorder_dataframe_columns(df, order_lists, keep_others=False) staticmethod

Reorder DataFrame columns.

Source code in AeroViz/rawDataReader/core/__init__.py
@staticmethod
def reorder_dataframe_columns(df, order_lists: list[list], keep_others: bool = False):
    """Reorder DataFrame columns."""
    new_order = []

    for order in order_lists:
        # Only add column that exist in the DataFrame and do not add them repeatedly
        new_order.extend([col for col in order if col in df.columns and col not in new_order])

    if keep_others:
        # Add all original fields not in the new order list, keeping their original order
        new_order.extend([col for col in df.columns if col not in new_order])

    return df[new_order]