Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Всем привет! Это один из первых моих постов, поэтому не судите строго. Сегодня хочу поделиться тем как мы добавляли поддержку сокета в наше приложение. Так получилось что сокет у нас односторонний и отправлять нам ничего в него не нужно, но это еще не значит, что проблем стало меньше.
Немного о себе
Являюсь лидом андроид мобильной команды разработки в небольшой финтех компании PeterPartner.
Что мы хотели?
Открыть сокет и получать из него информацию. Отправлять нам ничего, к счастью, не нужно было.
Что имеем?
Мулитиплатформенное приложение для Android/iOS
KotlinFlow
MVI
ТЗ
Итак начнем. Первым делом с командой обсудили, что мы хотим от сокета. Выяснилось что не так и много:
incoming: Flow<Result<T>> - собственно сообщения которые падают в сокет. Почему Result скажу чуть позже, почему T думаю не надо объяснять
connectState: Flow<ConnectState> - состояние подключения к сети сокета
isOpen: Boolean - еще одно состояние сокета ниже объясню почему их два
fun close() - тут все просто, нам нужно уметь закрывать сокет.
Получился вот такой интерфейс
/**
* Web socket session with receive type [I]. Open automatically with subscribe on [incoming]
*/
interface WSSession<T> {
/**
* Incoming messages flow
*/
val incoming: Flow<Result<T>>
/**
* Flush and close WS
*/
suspend fun close()
/**
* WS network connect state. Empty before first connect
*/
val connectState: Flow<ConnectState>
/**
* return true while not called [close] after return false.
*
* while socket closing also return false
*
* while socket not open first time also return true
*/
val isOpen: Boolean
}
enum class ConnectState {
Connected,
Disconnected
}
Реализация
Полный пример
inline fun <reified T> HttpClient.wsSession(
url: String,
connectivityProvider: ConnectivityProvider,
): WSSession<T> = object : WSSession<T> {
private var isClosed = false
private var innerWSSession: WebSocketSession? = null
override val connectState = MutableSharedFlow<ConnectState>(1)
override val isOpen: Boolean
get() = !isClosed
override val incoming: Flow<Result<T>> = flow<Result<T>> {
if (isClosed) return@flow
val ws = webSocketSession(urlString = url)
innerWSSession = ws
if (isClosed) {
ws.close()
emit(Result.failure(NoData))
return@flow
}
connectState.emit(ConnectState.Connected)
emitAll(ws.incoming.receiveAsFlow().map { frame -> requestWrapperWs(frame) })
}.retry { cause: Throwable ->
if (cause is IOException || cause is HttpRequestTimeoutException) {
connectState.emit(ConnectState.Disconnected)
connectivityProvider.waitConnect()
}
true
}.filter { isOpen }
override suspend fun close() {
isClosed = true
innerWSSession?.close()
}
}
suspend inline fun <reified T> requestWrapperWs(
frame: Frame,
): Result<T> = requestWrapperWs(frame) {
when (this) {
is Frame.Close -> throw CancellationException(this.data.decodeToString())
is Frame.Ping -> throw NoData
is Frame.Pong -> throw NoData
is Frame.Text -> commonJsonConfig.decodeFromString(this.readText())
is Frame.Binary -> commonJsonConfig.decodeFromString(this.data.decodeToString())
else -> throw NoData
}
}
suspend inline fun <reified T> requestWrapperWs(
frame: Frame,
modifier: Frame.() -> T,
): Result<T> = catching { frame.modifier() }
val commonJsonConfig = Json {
ignoreUnknownKeys = true
allowSpecialFloatingPointValues = true
}
/**
* no data - Anywhere. When data not set
*/
object NoData : Throwable()
Хотелось открывать сокет без использования suspend
, что дало возможность использовать его как поле в местах где это было нужно.
Из интересно в реализации стоит выделить то, что закрыть сокет гораздо большая проблема чем открыть.
Теперь немного детальнее о Result
Т.к. формат сообщений хоть и ожидается нужный, но никто этого гарантировать не может. Поэтому было решено использовать Result
как сущность которая имеет валидное и не очень значение. Под не валидным подразумеваются такие вещи как Ping, Close, сообщения которые мы не обрабатываем.
Зачем нам два состояния isOpen
?
Изначально хотели сделать только одно поле connectState
,но потом поняли, что это не одно и тоже. Что же такое isOpen
и connectState и почему их два?
isOpen
- используется как флаг того что сокет должен быть открыт. Имеется ввиду, что сейчас он может быть не открыт (еще, нету интернета, какая-то ошибка внутри или любая другая причина), но сокет будет пытаться переоткрыться и хранить в себе данные до тех пор пока это нам нужно.
connectState
- используется как флаг того что сокет реально открыт и данные по нему ходят.
Внимательный читатель мог заметить, что есть такое поле как connectivityProvider
так вот его реализация может быть любая на ваше усмотрение. Нам нужен от него только метод waitConnect
, который является suspend функцией и ждет когда система скажет что у нас есть доступ к интернету.
Как этим пользоваться?
Пример кода
var lastOpenSession: WSSession<*>? = null
_url.onEach { url ->
lastOpenSession?.close()
val session = httpClient.wsSession(
url = url,
connectivityProvider = connectivityProvider,
)
lastOpenSession = session
session.incoming.onSuccess {
//your code
}.launchIn(CoroutineScope(Dispatchers.Default))
session.connectState.onEach { state ->
_hasProgress.value = state == ConnectState.Disconnected
}.launchIn(this)
}.launchIn(storeScope).invokeOnCompletion {
CoroutineScope(Dispatchers.Default).launch { lastOpenSession?.close() }
}
Почему такая странная реализация вызова? В нашем приложении Store умирает как только пользователь выходит с экрана. Поэтому метод close
не будет вызван корректно. Пришлось придумать некоторый костыль если его можно считать таковым. Данный пример откроет сокет для конкретного URL и если он сменится, то сокет закроется и откроется новый.
Итог
Спасибо всем кто дочитал до конца! Надеюсь я кому-то помог открыть свой первый сокет. Пишите в комментариях, что можно улучшить как по коду так и по статье.