Asterisk — это болид «Формулы-1», а не рейсовый автобус

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Asterisk — фи, это же моветон


Здравствуйте уважаемые читатели этого замечательного ресурса. По уже сложившейся традиции — являюсь давним читателем habr'а, но только сейчас решил сделать пост. Что, собственно, побудило к написанию? Честно сказать, и сам не знаю. То ли притянутые статьи о производительности FreeSWITCH/Yate/3CX/etc в сравнении с Asterisk, то ли действительные, реальные проблемы архитектуры последнего, а, возможно, желание сделать что-нибудь уникальное.


И что удивительно, в первом случае, как правило, сравнивают мягкое и теплое, так сказать, FreeSWITCH/Yate/etc и FreePBX. Да-да, именно FreePBX. Это не опечатка. Причем интересно, что во всех сравнениях зачастую один Asterisk в дефолтной конфигурации. Ну, вы знаете, эта конфигурация — загруженные все имеющиеся модули, кривой диалплан (FreePBX как бы способствует) и куча остальной необъективщины. Что до родовых болячек Asterisk'а — да, объективно их вагон и маленькая тележка.


Что со всем этим делать? Разрушать стереотипы и исправлять родовые травмы. Этим и займемся.


Скрещиваем ежа с ужом


Многие из новичков испытывают дискомфорт, глядя на синтаксис описания диалплана в Asterisk'е, а некоторые на полном серьезе обосновывают выбор другого сервера телефонии именно необходимостью писать диалплан в том виде, в котором он есть по дефолту. Типа, перелопачивать многострочный XML — это верх комфорта. Да, есть возможность юзать LUA/AEL, и это хорошо. Но лично я отнес бы эту возможность в минусы и в частности то, что касается pbx_lua.


Как уже было сказано, иметь возможность описывать диалплан полноценным языком программирования — это хорошо. Проблема в том, что время жизни скрипта и его окружения равно времени жизни канала. Для каждого нового канала запускается свой экземпляр скрипта, следовательно, прощай, разделяемые между каналами переменные, единичная загрузка сторонних модулей, один разделяемый коннект к базе и т.д., и т.п. Строго говоря, от встраиваемого языка описания сценария этого и не нужно, но уж сильно хочется. А если хочется – значит, надо.


Итак, от классического Asterisk'а возьмем принципы pbx_lua, от Yate возьмем модель маршрутизации, а от FreeSWITCH ничего брать не будем, ибо "overhead" не нужен. ОК, с тем, что нам нужно родить, определились. Что же будем использовать для генетических экспериментов:


  • Asterisk, причем хотелось бы без привязок к версии. Тот же ARI был анонсирован, если мне не изменяет память, с 12-й версии. Если учесть, что до сих пор где то юзаются 1.8/1.6, а возможно и 1.4, то зависимость от версионных плюшек нам не нужна.
  • Lua — замечательный, гибкий и крайне функциональный скриптовый язык. Сам бог велел, так сказать, без комментариев.
  • Lunapark — интересный проект на github'е, своего рода сервер voip-приложений.

Про Lunapark стоит рассказать подробнее. Это сервер, реализующий потенциал AMI-протокола в связке с классическим FastAGI, что немаловажно в едином пространстве выполнения. То есть, получаем аналог ARI посредством тесной кооперации AGI и AMI в одном флаконе.


Предвижу логичный вопрос: для чего это все? Есть же Asterisk REST Interface, чей функционал ты тут пытаешься переизобрести! Ответ на этот вопрос неоднозначен. Согласен, ARI декларирует ряд преимуществ: да, он асинхронен, да, позволяет работать с "сырыми" примитивами, WebSockets и да, стильный, модный, молодежный XML/JSON — куда ж без него. Но, черт возьми, часть этих так называемых преимуществ крайне сомнительна и добавляет один, а то и более уровней абстракции. Другая же часть — вообще не преимущества. Преимущества — это когда что-то свойственно только тебе, ниже мы это увидим на примере той же асинхронности.


Как это работает? Стандартными средствами заворачиваем канал в FastAGI-приложение, внутри которого получаем возможность управлять звонком, как будто юзаем pbx_lua с незначительным изменением синтаксиса. Вишенкой на торте является возможность управлять состоянием самого Asterisk’а и окружением канала, для этого в распоряжении текущего FastAGI-приложения есть глобальный AMI-объект. Кстати, можно не заворачивать канал в FastAGI-приложение, а создать глобальный обработчик события, допустим, для NewChannel. А это уже преимущество по сравнению с ARI, там как известно, вне stasis'а ARI слеп.


Реализован Lunapark в лучших традициях кооперативной многозадачности, а именно всеми любимая асинхронность на сопрограммах. И как следствие отсутствие проблем с "shared data". То есть плюсы присутствуют, но и проблемы появляются. Одна из них — это необходимость описывать логику с оглядкой на асинхронность, но я думаю, это мы как-нибудь переживем.


Что дальше?


Синтаксис описания контекстов — а что же с ним не так? Да все с ним нормально, более того, практику написания диалплана нужно прописывать как профилактику для формирования структурированного мышления у новичков. Но, вместе с тем нужно понижать порог вхождения. Поэтому будем упрощать и в то же время добавлять функционала.


Простой пример:


[test]
exten => _XXX/102,1,Hangup()
exten => _XXX,1,Dial(SIP/${EXTEN})

В этом примере идет дозвон до трехзнака, кроме абонента 102. Вроде бы все логично и лаконично за исключением того, что шаблоны соответствия экстеншена ограничены небольшим набором правил, а так называемая extended маршрутизация возможна только по CallerID звонящего. А хотелось бы, к примеру, по CallerIDName или по текущему состоянию звонящего канала, а возможно по имени самого канала, а если реализовать полноценный regexp, так вообще красота. И да, я знаю, все эти хотелки можно реализовать, расписав контекст в таком виде:


[test]
exten => _XXX/102,1,Hangup()

; по CallerIDName
exten => _XXX,1,ExecIf($[ "${CALLERID(name)}" == "Vasya" ]?Hangup())

; По состоянию канала
exten => _XXX,n,ExecIf($[ "${CHANNEL(state)}" != "Ring" ]?Hangup())

; По имени канала
exten => _XXX,n,ExecIf($[ "${CUT(CUT(CHANNEL,-,1),/,2)}" == "333" ]?Hangup())

exten => _XXX,n,Dial(SIP/${EXTEN})

Но мой внутренний перфекционист начинает бунтовать при виде такого, а если представить аналогичную выборку по всем пользователям, да еще и действия нужны разные и посложнее Hangup'а, то extensions.conf превращается в длииинную портянку вызовов Goto, GoSub, Macro и, не дай бог, с каналами типа Local.


Выход один — прикручивать свои правила маршрутизации с подкидным и дамами с низкой социальной ответственностью.


В качестве примера:


${Exten}:match('%d%d%d')
           and 
(
  ${CallerIDNum}:match('201') or 
  ${CallerIDName}:match('Vasya') or 
  ${State}:lower() ~= 'ring' or 
  ${Channel}:match('^[^/]+/([^%-]+)') == '333'
) => Hangup();

${Exten}:match('%d%d%d') => Dial {callee = ('SIP/%s'):format(${Exten})};

Хм, вырвиглазненько получилось, но на удивление читается и понимается с первого взгляда. А самое главное, что у нас появился аналог regexp'ов и группировка правил на действие, что, несомненно, упростит составление маршрутов в будущем.


Что тут думать, прыгать надо.


В итоге имеем Lunapark как замену pbx_lua. Его средствами нам и нужно создать логику обработки нашей модели маршрутизации. Для начала нужно распарсить набор правил и заменить все вхождения ${...} на соответствующие им значения, то есть привести к виду ('...'). Значения будут браться из окружения текущего канала.
Затем приводим каждое правило к виду условного оператора, чтобы получить нечто похожее:


-- Exten = 123
-- Sate = Ring
-- CallerIDNum = 100
-- CallerIDName = Test
-- Channel = SIP/100-00000012c

if ('123'):match('%d%d%d') and
(
  ('100'):match('201') or
  ('Test'):match('Vasya') or
  ('Ring'):lower() ~= 'ring' or
  ('SIP/100-00000012c'):match('^[^/]+/([^%-]+)') == '333'
) then
  Hangup()
end

if ('123'):match('%d%d%d') then
  Dial {callee = ('SIP/%s'):format(('123'))}
end

Делать это будут две функции fmt и syntax соответственно:


local fmt = function(str, tab)
 return (str:gsub('(%${[^}{]+})', function(w)
  local mark = w:sub(3, -2) 

  return (mark:gsub('(.+)',function(v)
   local out = tab[v] or v

   return ("('%s')"):format(out)
  end))
 end))
end

local syntax = function(str)
 return (str:gsub('([^;]+)=>([^;]+)',function(p,r)
  return ([[ 
   if %s then
    %s
   end
  ]]):format(p,r)
 end))
end

В принципе ничего сложного, все просто и понятно. Идем дальше — считывать наши правила будем из файла в переменную при старте сервиса, а парсить их уже при звонке с актуальным окружением. Считыванием правил займется функция routes.


local routes = function(...)
  local conf, content = ...

  local f, err = io.open(conf, "r")

  if io.type(f) ~= 'file' then
   log.warn(err)  -- Глобальный LOG объект доступный благодаря Lunapark'у
   return ""
  else
   content = f:read('*all')
  end

  f:close() return content
end

Осталось сделать две вещи: завернуть звонки в Lunapark и соответственно их обработать с учетом наших маршрутов. Тут стоит немного пояснить такой момент — в Lunapark вся логика описывается в handler'е. Это текстовый файл, в котором мы будем определять наши FastAGI-приложения и работать с AMI и нашими маршрутами.


Как уже было сказано, объект AMI — глобальный и, помимо роли AMI-клиента, может устанавливать своего рода слушатели для конкретных AMI событий. Этим мы и воспользуемся, но для начала сделаем некоторые приготовления в extensions.conf.


[default]
exten => _[hit],1,NoOp()
exten => _.,n,Wait(5)

exten => _.,1,AGI(agi://127.0.0.1/${EXTEN}${IF($[ "X${PRMS}" != "X" ]?"?${PRMS}")})

Wait(5) в примере выше позволит нам не обрывать канал при завершении FastAGI-приложения, так как в маршрутах может быть описано несколько приложений, а выполнение их осуществляется по средствам Redirect на контекст default по ${EXTEN}.


Таким образом, беря во внимание все выше описанное и помня о кооперативной природе Lunapark'а, попробуем закодить логику обработки маршрутов через FastAGI-приложения.


-- Считываем наши правила в переменную rules
local rules = routes('routes.conf')
-- Очищаем все обработчики, таким образом очистятся только не именные обработчики
-- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUIT
ami.removeEvents('*')
-- Обработчик события создания нового канала
ami.addEvents {
 ['newchannel'] = function(e)
  -- Условия, только каналы с набором и каналы с контекстом users
  if (e['Context'] and e['Context']:match('users')) and e['Exten'] then
   -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся
   local step
   -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения
   local count = 0
   -- Парсим маршруты для текущего окружения канала
   local code, err = loadstring(syntax(fmt(rules,e))) 
   -- В описании маршрутов нет ошибок, двигаемся дальше
   if type(code) == 'function' then
    -- Проксируем будущие FastAGI приложения 
    setfenv(code,setmetatable({indexes = {}},{__index = function(t,k)
     -- Вот они последствия кооперативности
     return coroutine.wrap(
      function(...)
       local prms = {} -- Будущие параметры FastAGI приложения
       local owner = t -- Копия окружения
       local event = e -- Копия таблицы event
       local thread = coroutine.running() -- ID текущей сопрограммы 
       -- Парсим параметры и приводим к виду URI
       for p,v in pairs({...}) do
        if type(v) == 'table' then
         for key, val in pairs(v) do
          table.insert(prms,("%s=%s"):format(key,val))
         end
        else
         table.insert(prms,("%s=%s"):format(p,v))
        end
       end
       -- Если это не первое FastAGI приложение в цепочке
       if step then
        -- Запоминаем предыдущее перед этим
        local last = ("%s"):format(step)
        -- Добавляем ИМЕННЫЕ обработчики события UserEvent по доп. условиям
        -- И записываем в таблицу indexes(в окружении) их порядковые номера
        -- Именные обработчики требуют последующего удаления самостоятельно
        table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt)
         -- Ловим событие AGIStatus указывающее на завершение приложения
         -- Если это предыдущее перед нами, пробуждаем сопрограмму
         if (evt['Channel'] and evt['Channel'] == event['Channel'])
               and
          (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus'))
               and
          (evt['Script'] and evt['Script'] == last)
         then
          -- Соответствие порядкового номера нашей сопрограмме
          -- В цепочке может быть вызов одного приложения несколько раз
          -- Это позволит выполнять сопрограммы в порядке их определения
          if owner['indexes'][count] == thread then
           if coroutine.status(thread) ~= 'dead' then
            coroutine.resume(thread)
           end
          end
         end
        end,thread))
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Приостанавливаем сопрограмму
        coroutine.yield()
       else -- Здесь обрабатывается первое FastAGI приложение в цепочке
        local index -- Индекс для обработчика Hangup события
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Добавляем ИМЕННОЙ обработчик события Hangup для канала
        -- В этом месте подчищаем за собой
        index = ami.addEvent('Hangup',function(evt)
         if evt['Channel'] and evt['Channel'] == event['Channel'] then
          -- Удаляем обработчик событие Hangup по ранее запомненному индексу
          ami.removeEvent('Hangup',index)
          -- Удаляем все обработчики цепочек выполнения по индексу
          for _,v in pairs(owner['indexes']) do
           ami.removeEvent('UserEvent',v)
          end
          -- Делаем приятно сборщику мусора
          owner = nil
         end
        end,thread)
       end
       -- По средствам AMI выставляем переменную для канала и вызова в цепочке
       ami.setvar{
        Value = table.concat(prms,'&'),
        Channel = event['Channel'],
        Variable = 'PRMS'
       }
       -- Перенаправляем канал на AGI-приложение через контекст default
       ami.redirect{
        Exten = k,
        Priority = 1,
        Channel = event['Channel'],
        Context = 'default'
       }
       -- Выставляем индекс приложения
       count = count + 1
      end)
    end}))()
   else
    -- Если что-то пошло не так
    log.warn(err)
   end
  end
 end
}

Для тех, кто не смотрел или не понял код, поясню простым языком, что же мы там накодили. Для каждого действия в маршруте создается сопрограмма с копированием всего, чего нужно через замыкания и в приостановленном состоянии. Кроме сопрограммы первого действия, она выполняется как есть. Приостанавливаем мы их для эмуляции последовательного выполнения, то есть строго одна за другой, вторая после первой, третья после второй и т.д.


Стоп, так они и так друг за другом выполняются. На самом деле нет — если не эмулировать синхронность, то асинхронный redirect пробежится по каждому действию и займет это доли секунды. В нашем же коде мы выполняем каждое действие, по наступлению определенного события, а именно завершении предыдущего FastAGI-приложения. Lunapark заботливо генерирует специальный UserEvent по окончании выполнения каждого FastAGI-приложения с соответствующими параметрами — вот на это событие и ориентируемся. Сами же сопрограммы просто редиректят текущий канал в контекст default с экстеншном, равным текущему действию, предварительно установив переменную канала PRMS.


Самое интересное, что звонок после redirect'а придет опять в handler, но уже в контексте выполнения AGI и на соответствующее приложение. В нашем случае это Hangup() и Dial(). Давайте же напишем их для полноты повествования.


function Hangup(...)
  local app, channel = ... -- В этом отличие от pbx_lua

  app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL')))
  app.hangup()
end

function Dial(...)
  local app, channel = ...
  local leg = app.agi.params['callee'] or ''

  app.verbose(('Trying to make a call from %s to %s'):format(
   channel.get('CALLERID(num)'),
   leg:match('^[^/]+/([^%-]+)'))
  )
  app.dial(leg)
end

Ну, вот и все — допрыгались


Итак, давайте подытожим. Что же мы получили в результате этих генетических экспериментов?


  • гибкий, функциональный подход описания маршрутов;
  • возможность создания полнофункциональных VoIP-приложений. Теперь нам не нужно приложение Queue, мы можем его сами написать, не заморачиваясь с созданием своего собственного модуля для asterisk'а;
  • вынесли логику формирования, управления звонками на сторону сервера VoIP-приложений, тем самым сделав из asterisk'а Mediahub, что позволило повысить производительность VoIP-системы в целом;
  • возможность использовать достаточно простой, расширяемый и очень гибкий скриптовый язык для создания VoIP-приложений;
  • расширили возможности интеграции с внешними системами из VoIP-приложений.

Кому как, а мне пока все нравится.


handler целиком
local fmt = function(str, tab)
 return (str:gsub('(%${[^}{]+})', function(w)
  local mark = w:sub(3, -2) 

  return (mark:gsub('(.+)',function(v)
   local out = tab[v] or v

   return ("('%s')"):format(out)
  end))
 end))
end

local syntax = function(str)
 return (str:gsub('([^;]+)=>([^;]+)',function(p,r)
  return ([[ 
   if %s then
    %s
   end
  ]]):format(p,r)
 end))
end

local routes = function(...)
  local conf, content = ...

  local f, err = io.open(conf, "r")

  if io.type(f) ~= 'file' then
   log.warn(err)  -- Глобальный LOG объект доступный благодаря Lunapark'у
   return ""
  else
   content = f:read('*all')
  end

  f:close() return content
end

-- Считываем наши правила в переменную rules
local rules = routes('routes.conf')
-- Очищаем все обработчики, причем таким образом очистятся только неименные обработчики событий
-- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUIT
ami.removeEvents('*')
-- Обработчик события создания нового канала
ami.addEvents {
 ['newchannel'] = function(e)
  -- Условия, только каналы с набором и каналы с контекстом users
  if (e['Context'] and e['Context']:match('users')) and e['Exten'] then
   local step -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся
   local count = 0 -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения
   -- Парсим маршруты для текущего окружения канала
   local code, err = loadstring(syntax(fmt(rules,e))) 
   -- В описании маршрутов нет ошибок, двигаемся дальше
   if type(code) == 'function' then
    -- Проксируем будущие FastAGI приложения 
    setfenv(code,setmetatable({indexes = {}},{__index = function(t,k)
     -- Вот они последствия кооперативности
     return coroutine.wrap(
      function(...)
       local prms = {} -- Будущие параметры FastAGI приложения
       local owner = t -- Копия окружения
       local event = e -- Копия таблицы event
       local thread = coroutine.running() -- ID текущей сопрограммы 
       -- Парсим параметры и приводим к виду URI
       for p,v in pairs({...}) do
        if type(v) == 'table' then
         for key, val in pairs(v) do
          table.insert(prms,("%s=%s"):format(key,val))
         end
        else
         table.insert(prms,("%s=%s"):format(p,v))
        end
       end
       -- Если это не первое FastAGI приложение в цепочке
       if step then
        -- Запоминаем предыдущее перед этим
        local last = ("%s"):format(step)
        -- Добавляем ИМЕННЫЕ обработчики события UserEvent по доп. условиям
        -- И записываем в таблицу indexes(в окружении) их порядковые номера
        -- Именные обработчики требуют последующего удаления самостоятельно
        table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt)
         -- Ловим событие AGIStatus указывающее на завершение приложения
         -- Если это предыдущее перед нами, пробуждаем сопрограмму
         if (evt['Channel'] and evt['Channel'] == event['Channel'])
               and
          (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus'))
               and
          (evt['Script'] and evt['Script'] == last)
         then
          -- Соответствие порядкового номера нашей сопрограмме
          -- В цепочке может быть вызов одного приложения несколько раз
          -- Это позволит выполнять сопрограммы в порядке их определения
          if owner['indexes'][count] == thread then
           if coroutine.status(thread) ~= 'dead' then
            coroutine.resume(thread)
           end
          end
         end
        end,thread))
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Приостанавливаем сопрограмму
        coroutine.yield()
       else -- Здесь обрабатывается первое FastAGI приложение в цепочке
        local index -- Индекс для обработчика Hangup события
        -- Устанавливаем маркер текущего FastAGI приложения
        step = k
        -- Добавляем ИМЕННОЙ обработчик события Hangup для канала
        -- В этом месте подчищаем за собой
        index = ami.addEvent('Hangup',function(evt)
         if evt['Channel'] and evt['Channel'] == event['Channel'] then
          -- Удаляем обработчик событие Hangup по ранее запомненному индексу
          ami.removeEvent('Hangup',index)
          -- Удаляем все обработчики цепочек выполнения по индексу
          for _,v in pairs(owner['indexes']) do
           ami.removeEvent('UserEvent',v)
          end
          -- Делаем приятно сборщику мусора
          owner = nil
         end
        end,thread)
       end
       -- По средствам AMI выставляем переменную для канала и вызова в цепочке
       ami.setvar{
        Value = table.concat(prms,'&'),
        Channel = event['Channel'],
        Variable = 'PRMS'
       }
       -- Перенаправляем канал на AGI-приложение через контекст default
       ami.redirect{
        Exten = k,
        Priority = 1,
        Channel = event['Channel'],
        Context = 'default'
       }
       -- Выставляем индекс приложения
       count = count + 1
      end)
    end}))()
   else
    -- Если что-то пошло не так
    log.warn(err)
   end
  end
 end
}

function Hangup(...)
  local app, channel = ... -- В этом отличие от pbx_lua

  app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL')))
  app.hangup()
 end

function Dial(...)
  local app, channel = ...
  local leg = app.agi.params['callee'] or ''

  app.verbose(('Trying to make a call from %s to %s'):format(
   channel.get('CALLERID(num)'),
   leg:match('^[^/]+/([^%-]+)'))
  )
  app.dial(leg)
 end
Источник: https://habr.com/ru/post/513572/


Интересные статьи

Интересные статьи

Нередко при работе с Bitrix24 REST API возникает необходимость быстро получить содержимое определенных полей всех элементов какого-то списка (например, лидов). Традиционн...
Существует традиция, долго и дорого разрабатывать интернет-магазин. :-) Лакировать все детали, придумывать, внедрять и полировать «фишечки» и делать это все до открытия магазина.
В 2019 году люди знакомятся с брендом, выбирают и, что самое главное, ПОКУПАЮТ через интернет. Сегодня практически у любого бизнеса есть свой сайт — от личных блогов, зарабатывающих на рекламе, до инт...
Несмотря на то, что “в коробке” с Битриксом уже идут модули как для SOAP (модуль “Веб сервисы” в редакции “Бизнес” и старше), так и для REST (модуль “Rest API” во всех редакциях, начиная с...
Если вы последние лет десять следите за обновлениями «коробочной версии» Битрикса (не 24), то давно уже заметили, что обновляется только модуль магазина и его окружение. Все остальные модули как ...