Фильтрация шума сигнала

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Фильтрация шума очень важная вещ, при работе с различными датчиками. Сигнал, получаемый от них всегда приходит с шумами, и важно уметь их грамотно отфильтровать. Качественная фильтрация шума способна уменьшить погрешность и увеличить качество измерения датчика. Этим мы сегодня и займемся.

Примерами будут три графика - синусоида, квадратный сигнал и треугольный сигнал.

Код для вывода графиков
import matplotlib.pyplot as plt
from math import sin

# Generals variables
length = 30
resolution = 20

# Creating arrays with graphic
sinus_g = [sin(i / resolution) for i in range(length * resolution)]

square_g = [(1 if p > 0 else -1) for p in sinus_g]

triangle_g = []
t = -1
for _ in range(length * resolution):
    t = t+0.035 if t < 1 else -1
    triangle_g.append(t)

# Output of graphs
graphics = [sinus_g, square_g, triangle_g]

fig, axs = plt.subplots(3, 1)

for i in range(len(graphics)):
    axs[i].plot(graphics[i])

    axs[i].set_ylim([-2, 2]), axs[i].set_xlim([0, length*resolution]),
    axs[i].set_yticklabels([]), axs[i].set_xticklabels([])

plt.show()
синусоида, квадратный сигнал и треугольный сигнал
синусоида, квадратный сигнал и треугольный сигнал

Шум делится на два типа: постоянный шум с относительно стабильной амплитудой и случайные выбросы, вызванные внешними факторами. Симулировать это мы будем данным образом.

Функция для добавление шума
def noised(func, k=0.3, prob=0.03):
    o = []
    for p in func:
        r = (np.random.random()*2-1) * k

        # Standard noise and random emissions
        if np.random.random() < prob: c = p + r*7
        else: c = p + r

        o.append(c)
    return o

Среднее арифметическое

Как по мне, самый очевидный способ минимизировать шум. Для его реализации потребуется ввести буфер нескольких предыдущих значений, каждый раз, когда опрашивается датчик буфер сдвигается (первый элемент удаляется, а новое значение датчика добавляется в конец или как-нибудь по другому, главное фиксированный размер буфера). От его размера и будет зависеть результат и быстродействие кода. С фильтрацией алгоритм справляется очень неплохо. Но его проблема заключается в производительности. Контроллеру приходится делать множество вычислений с плавающей точкой, что может сказаться на скорости выполнения кода. Но если датчик не следует очень часто опрашивать, то этот метод отлично подойдет.

Код фильтрация графика средним арифметическим
def arith_mean(f, buffer_size=10):
    # Creating buffer
    if not hasattr(arith_mean, "buffer"):
        arith_mean.buffer = [f] * buffer_size

    # Move buffer to actually values ( [0, 1, 2, 3] -> [1, 2, 3, 4] )
    arith_mean.buffer = arith_mean.buffer[1:]
    arith_mean.buffer.append(f)

    # Calculation arithmetic mean
    mean = 0
    for e in arith_mean.buffer: mean += e
    mean /= len(arith_mean.buffer)

    return mean
buffer_size = 7
buffer_size = 7
buffer_size = 25
buffer_size = 25

Как мы видим, при увеличении буфера квадратный и треугольный сигналы сильно исказились, а синусоида сместилась. Проявляется запаздывание среднего значения. Так что, при использовании данного фильтра стоит аккуратнее подбирать размер буфера.

Медианный фильтр

Он скорее подойдет не как отдельный фильтр, а дополняющий к другому. Медианный фильтр отлично справляется со случайными выбросами. Если среднее арифметическое получая на вход (10, 12, 55), выдаст 25.67, то медиан выдаст 12. На первый взгляд не так просто понять как он устроен, но со своей задачей он справляется отлично. На просторах интернета я нашел лаконичное исполнение.

middle = (max(a,b) == max(b, c)) ? max(a, c) : max(b, min(a, c)); // c++
middle = max(a, c) if (max(a, b) == max(b, c)) else max(b, min(a, c)) # python
Код медианного фильтра
def median(f):
    # Creating buffer
    if not hasattr(median, "buffer"):
        median.buffer = [f] * 3

    # Move buffer to actually values ( [0, 1, 2] -> [1, 2, 3] )
    median.buffer = median.buffer[1:]
    median.buffer.append(f)

    # Calculation median
    a = median.buffer[0]
    b = median.buffer[1]
    c = median.buffer[2]
    middle = max(a, c) if (max(a, b) == max(b, c)) else max(b, min(a, c))

    return middle
Результат действия медианного фильтра
Результат действия медианного фильтра

Медианный фильтр отфильтровал почти все случайные выбросы. К тому же этот алгоритм совершенно прост в вычислении. И используя его в комбинации с каким-либо другим другим фильтром можно получить максимальный результат.

Экспоненциальное бегущее среднее и адаптивный коэффициент

Этот фильтр по своей сути схож с первым, а главное он более простой по вычислениям. Работает он так: к предыдущему фильтрованному значению прибавляется новое, и каждое из них помножено на собственный коэффициент, сумма которых равна 1. Коэффициент k подбирается от 0 до 1 и означает важность нового значения по сравнению с предыдущем, то есть чем больше k, тем больше важность нового нефильтрованного значения и фильтрованный график ближе к изначальному.

normalised = new * k + normalised * (1-k) # Эта формула
normalised += (new - normalised) * k # А лучше эта

Адаптивный коэффициент нужен для корректной фильтрации квадратных сигналов. Реализуется он в одну строчку и означает, что если предыдущее фильтрованное значение слишком далеко от нового (то есть корректный сигнал изменился), то k увеличиваем чтобы момент смены сигнала был четко виден, а иначе k оставляем нормальным.

k = s_k if (abs(new - normalised) < d) else max_k
Код фильтра упрощенного бегущего среднего с адаптивным коэффициент
def easy_mean(f, s_k=0.2, max_k=0.9, d=1.5):
    # Creating static variable
    if not hasattr(easy_mean, "fit"):
        easy_mean.fit = f

    # Adaptive ratio
    k = s_k if (abs(f - easy_mean.fit) < d) else max_k

    # Calculation easy mean
    easy_mean.fit += (f - easy_mean.fit) * k

    return easy_mean.fit

Работает достаточно хорошо, но из-за адаптивного коэффициента появляются некие артефакты в моментах с выбросами, так как они достаточно велики, чтобы определиться как составляющие квадратного сигнала. С этим можно бороться, увеличивая параметр d (требуемое расстояние для определения квадратного сигнала), но есть решение элегантнее. И тут нам как раз нам потребуется медианный фильтр. Если перед подачей данных на бегущее среднее, прогнать их через медиану, то можно получить качественный и быстродейственный алгоритм для фильтрации сигнала.

с использованием медианного фильтра
с использованием медианного фильтра

Все артефакты исчезли. Данная связка алгоритмов является одной из самых эффективных и быстродейственных. Она может быть применена практически везде.

Применение связки фильтров
def normalise(func):
    o = []
    for p in func:
        res = median(p)
        res = easy_mean(res)

        o.append(res)
    return o

Фильтр Калмана

Еще один фильтр, принцип работы которого не столь очевиден для каждого. Из всего кода вам необходимо знать, что r - это приблизительная амплитуда шума, а q следует подбирать самостоятельно. Если вы хотите разобраться в его устройстве поподробнее, можете прочитать эту статью с википедии, а код я взял и отредактировал с этой страницы.

Код фильтра Калмана
def kalman(f, q=0.25, r=0.7):
    if not hasattr(kalman, "Accumulated_Error"):
        kalman.Accumulated_Error = 1
        kalman.kalman_adc_old = 0

    if abs(f-kalman.kalman_adc_old)/50 > 0.25:
        Old_Input = f*0.382 + kalman.kalman_adc_old*0.618
    else:
        Old_Input = kalman.kalman_adc_old

    Old_Error_All = (kalman.Accumulated_Error**2 + q**2)**(1/2)
    H = Old_Error_All**2/(Old_Error_All**2 + r**2)
    kalman_adc = Old_Input + H * (f - Old_Input)
    kalman.Accumulated_Error = ((1 - H)*Old_Error_All**2)**(1/2)
    kalman.kalman_adc_old = kalman_adc

    return kalman_adc

Со своей задачей фильтр справляется, но он не всегда подойдет из-за множества вычислений с плавающей точкой.

Какой фильтр выбрать?

Это зависит от обстоятельств в которых фильтр будет использоваться. Естественно стоит попробовать каждый, но практически для каждого случая подойдёт вариант №2 из списка ниже. Напомню, что наиболее выгодная связка - медианного фильтра с другим. Пройдемся вкратце по перечисленным фильтрам из данной статьи:

  1. Медианный фильтр - самый быстродейственный алгоритм, применятся для отсеивания случайных выбросов, наиболее эффективен в связках с другими алгоритмами.

  2. Экспоненциальное бегущее среднее с адаптивным коэффициент - универсальный, и простой фильтр, подойдет в большинстве ситуаций

  3. Среднее арифметическое - эффективный, но не столь быстродейственный алгоритм

  4. Фильтр Калмана - универсальный способ фильтрации любого сигнала, но грамосткий по вычислениям

Кстати, если вы работаете со средой arduino, то там присутствует удобный инструмент визуализации графиков из значений переданных в порт, статья об этом. Это может упростить работу с сигналами.

И на последок исходники кодов, и еще несколько примеров...

Код для визуализация графиков
import matplotlib.pyplot as plt
from math import sin
from _f import *

# Generals variables
length = 30
resolution = 20

# Creating arrays with graphic
sinus_g = [sin(i / resolution) for i in range(length * resolution)]

square_g = [(1 if p > 0 else -1) for p in sinus_g]

triangle_g = []
t = -1
for _ in range(length * resolution):
    t = t+0.035 if t < 1 else -1
    triangle_g.append(t)

# Output of graphs
graphics = [sinus_g, square_g, triangle_g]

fig, axs = plt.subplots(3, 1)

for i in range(len(graphics)):
    noised_f = noised(graphics[i])

    axs[i].plot(noised_f, color='blue')
    axs[i].plot(graphics[i], color='black')
    axs[i].plot(normalise(noised_f), linewidth=3, color='red')

    axs[i].set_ylim([-2, 2]), axs[i].set_xlim([0, length*resolution]),
    axs[i].set_yticklabels([]), axs[i].set_xticklabels([])

plt.show()
Код с фильтрами
import numpy as np

np.random.seed(58008)


def normalise(func):
    o = []
    for p in func:
        res = median(p)
        res = easy_mean(res)

        o.append(res)
    return o


def noised(func, k=0.3, fitob=0.03):
    o = []
    for p in func:
        r = (np.random.random()*2-1) * k

        # Standard noise and random emissions
        if np.random.random() < fitob: c = p + r*7
        else: c = p + r

        o.append(c)
    return o


def arith_mean(f, buffer_size=10):
    # Creating buffer
    if not hasattr(arith_mean, "buffer"):
        arith_mean.buffer = [f] * buffer_size

    # Move buffer to actually values ( [0, 1, 2, 3] -> [1, 2, 3, 4] )
    arith_mean.buffer = arith_mean.buffer[1:]
    arith_mean.buffer.append(f)

    # Calculation arithmetic mean
    mean = 0
    for e in arith_mean.buffer: mean += e
    mean /= len(arith_mean.buffer)

    return mean


def easy_mean(f, s_k=0.2, max_k=0.9, d=1.5):
    # Creating static variable
    if not hasattr(easy_mean, "fit"):
        easy_mean.fit = f

    # Adaptive ratio
    k = s_k if (abs(f - easy_mean.fit) < d) else max_k

    # Calculation easy mean
    easy_mean.fit += (f - easy_mean.fit) * k

    return easy_mean.fit


def median(f):
    # Creating buffer
    if not hasattr(median, "buffer"):
        median.buffer = [f] * 3

    # Move buffer to actually values ( [0, 1, 2] -> [1, 2, 3] )
    median.buffer = median.buffer[1:]
    median.buffer.append(f)

    # Calculation median
    a = median.buffer[0]
    b = median.buffer[1]
    c = median.buffer[2]
    middle = max(a, c) if (max(a, b) == max(b, c)) else max(b, min(a, c))

    return middle


def kalman(f, q=0.25, r=0.7):
    if not hasattr(kalman, "Accumulated_Error"):
        kalman.Accumulated_Error = 1
        kalman.kalman_adc_old = 0

    if abs(f-kalman.kalman_adc_old)/50 > 0.25:
        Old_Input = f*0.382 + kalman.kalman_adc_old*0.618
    else:
        Old_Input = kalman.kalman_adc_old

    Old_Error_All = (kalman.Accumulated_Error**2 + q**2)**(1/2)
    H = Old_Error_All**2/(Old_Error_All**2 + r**2)
    kalman_adc = Old_Input + H * (f - Old_Input)
    kalman.Accumulated_Error = ((1 - H)*Old_Error_All**2)**(1/2)
    kalman.kalman_adc_old = kalman_adc

    return kalman_adc
фильтрация сильного шума с помощью kalman(p, r=3, q=0.4), arith_mean(res, buffer_size=5)
фильтрация сильного шума с помощью kalman(p, r=3, q=0.4), arith_mean(res, buffer_size=5)
просто линейный шум, нормализованный средним арифметическим с буфером в 20
просто линейный шум, нормализованный средним арифметическим с буфером в 20

Ещё интересный эксперимент: я построчно загрузил зашумленную картинку своего кота и пропустил её через фильтр Калмана.

Источник: https://habr.com/ru/post/588270/


Интересные статьи

Интересные статьи

Привет, Хабр!Меня зовут Дмитрий Матлах. Я тимлид в AGIMA. Мы с коллегами обратили внимание, что в сообществе часто возникает вопрос о том, как совместить на одном проекте Bitrix-компоненты и реактивны...
В прошлой части мы поговорили о советах директору по разработке бизнес-процесса в Битрикс24, сейчас же я постараюсь дать советы руководителям отделов, поскольку по моему опыту почти всегд...
Статья о вариантах усиления сигнала сотовой связи на различных объектах: дача, офис, склад..., так же краткий обзор популярных предложений на рынке. Существует два основных варианта ...
1С Битрикс: Управление сайтом (БУС) - CMS №1 в России по версии портала “Рейтинг Рунета” за 2018 год. На рынке c 2003 года. За это время БУС не стоял на месте, обрастал новой функциональностью...
Если вы последние лет десять следите за обновлениями «коробочной версии» Битрикса (не 24), то давно уже заметили, что обновляется только модуль магазина и его окружение. Все остальные модули как ...