Когда официальные гайды не такие и полезные или WebSocket на ktor

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!

Мы все похожи на этого кота, когда узнаем новое
Мы все похожи на этого кота, когда узнаем новое

Всем привет! Это один из первых моих постов, поэтому не судите строго. Сегодня хочу поделиться тем как мы добавляли поддержку сокета в наше приложение. Так получилось что сокет у нас односторонний и отправлять нам ничего в него не нужно, но это еще не значит, что проблем стало меньше.

Немного о себе

Являюсь лидом андроид мобильной команды разработки в небольшой финтех компании PeterPartner.

Что мы хотели?

Открыть сокет и получать из него информацию. Отправлять нам ничего, к счастью, не нужно было.

Что имеем?

  1. Мулитиплатформенное приложение для Android/iOS

  2. KotlinFlow

  3. MVI


ТЗ

Итак начнем. Первым делом с командой обсудили, что мы хотим от сокета. Выяснилось что не так и много:

  • incoming: Flow<Result<T>> - собственно сообщения которые падают в сокет. Почему Result скажу чуть позже, почему T думаю не надо объяснять

  • connectState: Flow<ConnectState> - состояние подключения к сети сокета

  • isOpen: Boolean - еще одно состояние сокета ниже объясню почему их два

  • fun close() - тут все просто, нам нужно уметь закрывать сокет.

Получился вот такой интерфейс
/**
 * Web socket session with receive type [I]. Open automatically with subscribe on [incoming]
 */
interface WSSession<T> {
    /**
     * Incoming messages flow
     */
    val incoming: Flow<Result<T>>

    /**
     * Flush and close WS
     */
    suspend fun close()

    /**
     * WS network connect state. Empty before first connect
     */
    val connectState: Flow<ConnectState>

    /**
     * return true while not called [close] after return false.
     *
     * while socket closing also return false
     *
     * while socket not open first time also return true
     */
    val isOpen: Boolean
}


enum class ConnectState {
    Connected,
    Disconnected
}

Реализация

Полный пример
inline fun <reified T> HttpClient.wsSession(
    url: String,
    connectivityProvider: ConnectivityProvider,
): WSSession<T> = object : WSSession<T> {
    private var isClosed = false
    private var innerWSSession: WebSocketSession? = null

    override val connectState = MutableSharedFlow<ConnectState>(1)
    override val isOpen: Boolean
        get() = !isClosed

    override val incoming: Flow<Result<T>> = flow<Result<T>> {
        if (isClosed) return@flow
        val ws = webSocketSession(urlString = url)
        innerWSSession = ws
        if (isClosed) {
            ws.close()
            emit(Result.failure(NoData))
            return@flow
        }
        connectState.emit(ConnectState.Connected)
        emitAll(ws.incoming.receiveAsFlow().map { frame -> requestWrapperWs(frame) })
    }.retry { cause: Throwable ->
        if (cause is IOException || cause is HttpRequestTimeoutException) {
            connectState.emit(ConnectState.Disconnected)
            connectivityProvider.waitConnect()
        }
        true
    }.filter { isOpen }

    override suspend fun close() {
        isClosed = true
        innerWSSession?.close()
    }

}


suspend inline fun <reified T> requestWrapperWs(
    frame: Frame,
): Result<T> = requestWrapperWs(frame) {
    when (this) {
        is Frame.Close -> throw CancellationException(this.data.decodeToString())
        is Frame.Ping -> throw NoData
        is Frame.Pong -> throw NoData
        is Frame.Text -> commonJsonConfig.decodeFromString(this.readText())
        is Frame.Binary -> commonJsonConfig.decodeFromString(this.data.decodeToString())
        else -> throw NoData
    }
}

suspend inline fun <reified T> requestWrapperWs(
    frame: Frame,
    modifier: Frame.() -> T,
): Result<T> = catching { frame.modifier() }

val commonJsonConfig = Json {
    ignoreUnknownKeys = true
    allowSpecialFloatingPointValues = true
}

/**
 * no data - Anywhere. When data not set
 */
object NoData : Throwable()

Хотелось открывать сокет без использования suspend, что дало возможность использовать его как поле в местах где это было нужно.
Из интересно в реализации стоит выделить то, что закрыть сокет гораздо большая проблема чем открыть.

Теперь немного детальнее о Result

Т.к. формат сообщений хоть и ожидается нужный, но никто этого гарантировать не может. Поэтому было решено использовать Result как сущность которая имеет валидное и не очень значение. Под не валидным подразумеваются такие вещи как Ping, Close, сообщения которые мы не обрабатываем.

Зачем нам два состояния isOpen?

Изначально хотели сделать только одно поле connectState ,но потом поняли, что это не одно и тоже. Что же такое isOpen и connectState и почему их два?

isOpen - используется как флаг того что сокет должен быть открыт. Имеется ввиду, что сейчас он может быть не открыт (еще, нету интернета, какая-то ошибка внутри или любая другая причина), но сокет будет пытаться переоткрыться и хранить в себе данные до тех пор пока это нам нужно.

connectState - используется как флаг того что сокет реально открыт и данные по нему ходят.

Внимательный читатель мог заметить, что есть такое поле как connectivityProvider так вот его реализация может быть любая на ваше усмотрение. Нам нужен от него только метод waitConnect , который является suspend функцией и ждет когда система скажет что у нас есть доступ к интернету.

Как этим пользоваться?

Пример кода
var lastOpenSession: WSSession<*>? = null
_url.onEach { url ->
    lastOpenSession?.close()
    val session = httpClient.wsSession(
        url = url,
        connectivityProvider = connectivityProvider,
    )
    lastOpenSession = session
    session.incoming.onSuccess {
        //your code
    }.launchIn(CoroutineScope(Dispatchers.Default))
    session.connectState.onEach { state ->
        _hasProgress.value = state == ConnectState.Disconnected
    }.launchIn(this)
}.launchIn(storeScope).invokeOnCompletion {
    CoroutineScope(Dispatchers.Default).launch { lastOpenSession?.close() }
}

Почему такая странная реализация вызова? В нашем приложении Store умирает как только пользователь выходит с экрана. Поэтому метод close не будет вызван корректно. Пришлось придумать некоторый костыль если его можно считать таковым. Данный пример откроет сокет для конкретного URL и если он сменится, то сокет закроется и откроется новый.

Итог

Спасибо всем кто дочитал до конца! Надеюсь я кому-то помог открыть свой первый сокет. Пишите в комментариях, что можно улучшить как по коду так и по статье.

Источник: https://habr.com/ru/articles/779866/


Интересные статьи

Интересные статьи

В статье расскажу пару случаев из практики анализа защиты приложений Android. Имена и место событий изменены. Любые совпадения — случайность
Rust набирает популярность среди разработчиков. Мы, организаторы RustCon, решили опубликовать серию статей о том, по каким причинам компании начинают использовать его для своих проектов. Первая с...
Могу честно сказать — я побаиваюсь CSS. За последние годы он неслабо разросся, но вместе с этим пришла и монструозность (ну то есть чего вы всерьёз не можете сделать на C...
«Но это же в любом учебнике по сетям написано!» — возмутится нетерпеливый читатель. Однако, не нужно спешить с выводами. Написано по этому поводу много, но, к сожалению, далеко не всег...
В этой статье описываются некоторые методы ускорения загрузки фронтенд-приложений, чтобы реализовать отзывчивый, быстрый пользовательский интерфейс. Мы обсудим общую архитектуру фронтенда, как...