Всем доброго дня! С вами Анна Жаркова, ведущий мобильный разработчик компании Usetech. Продолжаем рассматривать способы многопоточный работы в Kotlin Native.
В предыдущей части мы посмотрели некоторые нюансы работы с корутинами, как работать с Worker и AtomicReference.
Еще одним возможным API для работы с многопоточностью является DetachedObjectGraph.
//код Supranatural concurrency
override fun <T, V> execute(jobInput: T, job: (T) -> V): Future<V> {
val deferred = DeferredFuture<V>()
val detached = DetachedObjectGraph { Triple(jobInput, job, deferred).toImmutable() }.asCPointer()
dispatch_async_f(dispatch_get_main_queue(), detached, staticCFunction { it: COpaquePointer? ->
initRuntimeIfNeeded()
val attached = DetachedObjectGraph<Triple<T, (T) -> V, DeferredFuture<V>>>(it).attach()
val result = attached.second(attached.first)
attached.third.setValue(result)
})
return deferred
}
Чем оно примечательно, так тем, что это способ передавать объекты между потоками без заморозки. За счет чего это достигается:
1. Входные параметры, ссылки на блоки и коллбэки, которые надо вызвать и передать в другом потоке, кладутся в качестве общего содержимого вовнутрь DetachedObjectGraph.
2. Затем получаем указатель на открепленный подграф с нашим объектом с помощью asCPointer. И уже внутри нужного потока вызываем staticCFunction, где в качестве параметра работаем с нашим указателем.
3. Для того, чтобы извлечь из графа параметры, надо его прикрепить с помощью команды attach, извлечь упакованные данные и преобразовать нужным способом.
4. А вот вернуть коллбэк нужно вне этого потока.
Важно! Attach открепленного объекта можно вызвать один раз. Иначе можно получить исключение IllegalStateException: Illegal transfer state. Поэтому ссылка на граф должна зануляться после окончания работы блока.
Также при упаковке объекта полезно указать TransferMode.SAFE. Но в API DetachedObjectGraph этот параметр используется по умолчанию.
Для удобства можно сделать обертку, которая позволит работать с изменяемыми элементами в разных потоках, соединив и worker, и AtomicReference, и DetachedGraphObject:
class SharedDetachedObject<T:Any>(producer: () -> T) {
private val adog : AtomicReference<DetachedObjectGraph<Any>?>
private val lock = Any()
init {
val detachedObjectGraph = DetachedObjectGraph { producer() as Any }.freeze()
adog = AtomicReference(detachedObjectGraph.freeze())
}
fun <R> access(block: (T) -> R): R = trySynchronized(lock){
val holder = FreezableAtomicReference<Any?>(null)
val producer = { grabAccess(holder, block) as Any }
adog.value = DetachedObjectGraph(TransferMode.SAFE, producer).freeze()
val result = holder.value!!
holder.value = null
result as R
}
private fun <R> grabAccess(holder:FreezableAtomicReference<Any?>, block: (T) -> R):T{
val attach = adog.value!!.attach()
val t = attach as T
holder.value = block(t)
return t
}
}
Красиво и сложно, местами даже слишком.
Теперь рассмотрим, как наладить коммуникацию между контекстами и частями нативного запроса нашего клиента с помощью корутин. Для этого нам доступен следующий функционал:
•Channels
•Flows
•CompletableDeffered
Начнем с CompletaleDeferred. Данный механизм позволит нам awaitable, результат работы которого мы можем вернуть вместо callback в suspend функции:
class DefferedResponseReader: NSObject(), NSURLSessionDataDelegateProtocol {
private var chunks = ByteArray(0).atomic()
private var rawResponse = CompletableDeferred<Response>()
suspend fun awaitResponse(): Response {
return rawResponse.await().share()
}
override fun URLSession(
session: NSURLSession,
task: NSURLSessionTask,
didCompleteWithError: NSError?
) {
val response = task.response as NSHTTPURLResponse
completed(response.statusCode,didCompleteWithError as? Error)
}
fun completed(code: Long, error: Error?) {
val content = chunks.value.string()
if (!rawResponse.isCompleted) {
NSLog("completed: %s",content)
rawResponse.complete(Response(code, content, error))
clearChunks()
} else {
NSLog("already completed:")
}
}
Так как такой код мы можем вызвать только в корутине, то модернизируем и запрос:
class HttpDefferedEngine {
val engineJob = SupervisorJob()
val engineScope: CoroutineScope = CoroutineScope(defaultDispatcher
+ engineJob)
suspend fun request(request: Request): Response {
val reader = DefferedResponseReader()
val urlSession =
NSURLSession.sessionWithConfiguration(
NSURLSessionConfiguration.defaultSessionConfiguration, responseReader,//.share(),
delegateQueue = NSOperationQueue.currentQueue()
)
/*....**/
val task = urlSession.share().dataTaskWithRequest(urlRequest)
engineScope.launch {
task?.resume()
}
val response = responseReader.awaitResponse()
return response
}
}
Т.к заморозка вызывается до отправки в скоуп и вынесен на более низкий уровень взаимодействия, то проблема, с которой мы столкнулись ранее у нас решена, и исключения не возникает.
Теперь избавимся от атомарных ссылок. Для этого мы можем использовать следующее API:
private var chunks = Channel<ByteArray>(UNLIMITED)
//либо
private var chunksFlow = MutableStateFlow(ByteArray(0))
В случае channel модифицируем получение данных и отправку в ответе так:
suspend fun awaitResponse(): Response {
var array = ByteArray(0)
var response = rawResponse.await()
chunks.consumeEach {
array += it
}
response.content = array.string()
return response.share() // помним о заморозке
}
private fun updateChunks(data: NSData) {
val bytes = data.toByteArray()
scope.launch {
chunks.send(bytes)
}
}
В случае flow вот так:
suspend fun awaitResponse(): Response {
var chunks = ByteArray(0)
chunksFlow.onEach {
chunks += it
}.launchIn(scope)
val response = rawResponse.await()
response.content = chunks.string()
return response.share()
}
private fun updateChunks(data: NSData) {
val bytes = data.toByteArray().share()
chunksFlow.tryEmit(bytes)
}
Кстати, MutableStateFlow является приемлемой альтернативой MutableLiveData, которую мы спокойно можем использовать в Kotlin Native.
При вызове на стороне общего модуля проблем у нас вообще не возникнет:
class MoviesListViewModel() : BaseViewModel(ioDispatcher) {
private val service = MoviesService.instance
val moviesList: MutableStateFlow<List<MoviesItem>> =
MutableStateFlow(emptyList())
fun loadMovies() {
scope.launch {
val result = service.loadMovies()
moviesList.value = result.content?.results ?: arrayListOf()
}
}
Но вот прямо на стороне iOS (нативного приложения) вызов флоу выглядит странновато:
flow.collect(collector: <#T##Kotlinx_coroutines_coreFlowCollector#>,
completionHandler: <#T##(KotlinUnit?, Error?) -> Void#>)
Необходимо реализовать специальный коллектор, с помощью которого собирать приходящие значения:
class Collector<T>: Kotlinx_coroutines_coreFlowCollector {
let callback:(T) -> Void
init(callback: @escaping (T) -> Void) {
self.callback = callback
}
func emit(value: Any?, completionHandler: @escaping (KotlinUnit?, Error?) -> Void) {
// do whatever you what with the emitted value
callback(value as! T)
//Значения иногда теряются
completionHandler(KotlinUnit(), nil)
}
}
Однако, если сконфигурировать обработку неверно, сигналы могут теряться.
Поэтому на стороне IOS полезно сделать вот такую обработку Flow с помощью расширения-обертки на стороне общего модуля:
class AnyFlow<T>(source: Flow<T>): Flow<T> by source {
fun collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable {
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main)
scope.launch {
try {
collect {
onEach(it)
}
onCompletion(null)
} catch (e: Throwable) {
onCompletion(e)
}
}
return object : Cancellable {
override fun cancel() {
scope.cancel()
}
}
}
}
Получаемый поток можно слушать, как обычный suspend на стороне iOS:
someFlow().collect { value->
// do with value
} onCompletion {
print("Completed")
}
Получилось насыщенно. Какие-то вещи удобнее делать с помощью Kotlin Coroutines, какие-то проще делать без корутин, но нативным API Kotlin Native.
Остается сравнить с тем, что у нас появилось в новой модели управления памятью, о чем смотрите в следующей части.
И полезные ссылки:
Kotlin Multiplatform. Advanced multithreading by Anna Zharkova | KotLand
https://betterprogramming.pub/using-kotlin-flow-in-swift-3e7b53f559b6
https://github.com/JetBrains/kotlin-native
https://github.com/JetBrains/kotlin-native/blob/master/IMMUTABILITY.md
https://github.com/Kotlin/kotlinx.coroutines/issues/462
https://github.com/anioutkazharkova/kotlin_native_network_client
https://kmpdocs.suparnatural.com/concurrency/#worker
https://elizarov.medium.com/the-reason-to-avoid-globalscope-835337445abc