Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
В предыдущей статье Reactive Spring ABAC Security: безопасность уровня Enterprise впервые и крайне кратко представлен описанный в данной статье подход. Пришло время детально раскрыть нюансы и глубину потенциала нового подхода, когда основным протоколом для взаимодействия браузера и мобильных приложений с сервисами Spring Cloud выступает WebSocket.
Аннотация
Spring Cloud Gateway официально служит следующим целям: слоем безопасности, маршрутизатором, балансировщиком, провайдером протоколов сжатия. Gateway ничего не знает о передаваемых данных и тем более о бизнес-логике – это инфраструктурный сервис.
Связь Gateway с WebSocket обуславливает здравый смысл. Если перенести логику WebSocket в Gateway, то при высоких нагрузках потребуется ненамного увеличить количество подов с одной стороны, а с другой если через WebSocket работают не только браузеры, но и мобильные приложения – то такой подход становится естественным.
В статье описываются преимущества данного подхода, детально перечисляются нюансы реализации и определяются общие перспективы дальнейшего развития. Но основная идея сводится к уменьшению стоимости разработки за счёт переиспользования базовых технологий для всего необходимого функционала.
Введение
В эпоху рассвета мобильных приложений на Android, в то время, когда в 2009 году Samsung объявил об отказе от Symbian и переходе на Android (iOS ещё не было) – WebSocket в том же году был ещё в виде черновика, т.е. исторически мобильные приложения не застали эту технологию и развитие ушло своим чередом.
Кроме того, существовала проблема при переключении телефона с одной соты на другую– также менялся и IP-адрес телефона, что прерывало все установленные соединения. Данный фактор был не в пользу WebSocket, который в исходной версиине умел переподключаться при обрыве связи. Сейчас, качество связи стала очень стабильной, а IP-адреса внутри сети мобильного оператора привязаны к телефону. Можно сказать появились тепличные условия для восхождения WebSocket и почему бы и нет.
В представленном проекте Spring Cloud WebSocket Gateway, интеграция WebSocket не повлияла на работу и исходный код существующих сервисов, т.к. конвейер сообщений WebSocket в Restful прозрачно конвертирует запросы и ответы.
Преимущества использования конвейера WebSocket в Restful следующие:
Классический микросервисный подход Spring Cloud к организации обработки потока данных на стороне бэкенда как от браузера так и от мобильных приложений по WebSocket, когда сервисам не интересно откуда приходят http-запросы;
Единое понятие сессии: отключение всех соединений WebSocket пользователя однозначно сообщит о закрытии сессии как в браузере, так и в мобильном приложении (такое знание для многих бизнес-процессов критично);
Открытые соединения WebSocket точно определяют количество пользователей онлайн, в то время как в случае http-запросов онлайн считается по количеству запросов за последние полчаса;
Более не требуется тратить значительные ресурсы на организацию Push-уведомлений – полученные через WebSocket уведомлений ничем не отличаются визуально, при условии когда приложение запущено (что в подавляющем большинстве случаев приемлемо);
При высоких нагрузках WebSocket заметно быстрее HTTP и значительно меньше потребляет трафика, при этом, поддерживает полнодуплексный асинхронный обмен сообщениями в обе стороны;
Из коробки Origin-based cross-domain policy (политика безопасности на основе происхождения);
Прямым преимуществом использования WebSocket перед http-запросами в возможности обратной связи в реальном времени, что позволяет строить современные высококонкурентные реактивные интерфейсы в мобильных приложениях.
Если принять во внимание факт увеличения расхода батареи из-за постоянно открытого соединения WebSocket, то согласно GSM Arena с 2010 года плотность аккумуляторов к 2020 году выросла в 3 раза и при том же форм-факторе ёмкость батареи достигла 5000 мАч.
А если ещё учесть время на полную зарядку современных аккумуляторов от сети в 15 минут, то при таких темпах развития данный факт можно смело отнести к незначительным.
Конечно, Gateway продолжит маршрутизацию и балансировку http-запросов, т.к. останутся: авторизация пользователя, запросы выдающие большие объёмы данных, выгрузка файлов в браузер, а также интеграция внешних систем.
Рассматриваемый в статье проект spring-cloud-websocket-gateway позволяет снизить стоимость разработки и владения за счёт использования классического подхода http-запросов при работе с браузером и мобильными приложениями по протоколу WebSocket, а также эффективной утилизации сессий в момент окончания жизни jwt-токена. Представленный исходный код несмотря на небольшой размер и простоту реализации – эффективно выполняет все описанные в статье возможности.
Поддержка WebSocket в Android и iOS
В 2013 году один из разработчиков проекта Socket.io Naoyuki Kanezawa закомитил клиент WebSocket под Android, которая также активно дорабатывается до настоящего времени. А спустя 4 года появился клиент под iOS также с полной поддержкой WebSocket.
В 2014 году вышла в релиз спецификация JSR 356, поддержка которой привела к включению в Java Development Kit (JDK) пакета javax.websocket. В дальнейшем, непосредственно в Kotlin Standard Library была внедрена обвёртка вокруг DOM API с базовым классом WebSocket – который также можно использовать в Android приложениях.
Помимо нативных библиотек от команды Socket.io, существует более десятка сторонних с полной имплементацией протокола WebSocket по спецификации RFC 6455 и большинство из них активно поддерживаются довольно крупными сообществами.
Наиболее распространённая практика использования WebSocket в среде iOS – это библиотека SocketRocket от Facebook, а в среде Android – библиотека Java-WebSocket от opensource разработчика Nathan Rajlich, создавший проект в 2010 году (скорее всего для нужд одного из клиентов фондовой биржи NASDAQ).
WebSocket профессионально присутствует в мобильных приложениях более 10 лет, т.к. на глобальном уровне требование к доставке данных в реальном режиме времени постоянно расширяется с каждым новым типом данных из года в год.
Основные аспекты реализации
Браузер или мобильное приложение отправляет и принимает запросы по WebSocket в обвёртке:
data class MessageWrapper(
val type: HttpMethod = HttpMethod.GET,
val baseUrl: String = StringUtils.EMPTY,
val uri: String = StringUtils.EMPTY,
val body: JsonNode? = null
)
Атрибуты образуют http-запрос к сервисам:
type – тип GET, HEAD, POST, PUT, PATCH, DELETE, TRACE (по умолчанию GET)
baseUrl – имя сервиса, например: http://account-service
uri – метод контроллера и query-запрос, например: findAllPaged?sort=id:desc
body – json запроса при типе запроса POST, PUT или PATCH
при формировании ответа body заменяется ответом с сервиса: message.copy(body = it), а baseUrl и uri служат идентификаторами запроса в обработчике WebSocket в браузере или в мобильном приложении.
Функция преобразования запросов WebSocket в Restful и обратно:
fun handling(message: MessageWrapper, username: String) {
val webClient = Beans.of(WebClient.Builder::class.java).baseUrl(message.baseUrl).build()
val response = when (message.type) {
HttpMethod.GET -> webClient.get().uri(message.uri).retrieve()
HttpMethod.POST -> webClient.post().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
HttpMethod.PUT -> webClient.put().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
HttpMethod.DELETE -> webClient.delete().uri(message.uri).retrieve()
HttpMethod.PATCH -> webClient.patch().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
HttpMethod.HEAD -> webClient.head().uri(message.uri).retrieve()
HttpMethod.OPTIONS -> webClient.options().uri(message.uri).retrieve()
HttpMethod.TRACE -> webClient.method(HttpMethod.TRACE).uri(message.uri).retrieve()
}
response
.onStatus({ status -> status.isError })
{ clientResponse ->
clientResponse.bodyToMono(ByteArrayResource::class.java)
.map { responseAnswer: ByteArrayResource ->
WebClientResponseException(
clientResponse.rawStatusCode(),
clientResponse.statusCode().name,
clientResponse.headers().asHttpHeaders(),
responseAnswer.byteArray,
Charsets.UTF_8
)
}
}
.bodyToMono(JsonNode::class.java).subscribe {
info { "Request[${message.baseUrl}${message.uri}] by user[$username] accepted" }
debug { it.toString() }
val sessionChain = clients.getIfPresent(username)
sessionChain?.sendMessage(message.copy(body = it))
}
}
Детальный обзор реализации
Рассмотрим публикацию точки входа для реактивного WebSocket для конвертации запросов из WebSocket в HTTP к сервисам и обратно:
@Component
@WebSocketEntryPoint("/wsf")
class WebSocketFactory(val kafkaPublisher: EventDrivenPublisher) : WebSocketHandler {
override fun handle(session: WebSocketSession) = session.handshakeInfo.principal
.cast(UsernamePasswordAuthenticationToken::class.java)
.flatMap { authToken: UsernamePasswordAuthenticationToken ->
val output = session.send(Flux.create {
authToken.credentials
clients.put(authToken.name, WebSocketSessionChain(
session = session, tokenHash = authToken.credentials as Long, chain = it))
})
val input = session.receive()
.map { obj: WebSocketMessage -> obj.payloadAsText.parseJson(MessageWrapper::class.java) }
.doOnNext { handling(it, authToken.name) }.then()
Mono.zip(input, output).then().doFinally { signal: SignalType ->
val sessionChain = clients.getIfPresent(authToken.name)!!
kafkaPublisher.publishDisconnect(
UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
).subscribe {
clients.invalidate(authToken.name)
sessionChain.session.close()
info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
}
}
kafkaPublisher.publishConnect(
UserConnectEvent(authToken.name, authToken.authorities.map { it.authority })
)
}
Кэш clients на базе библиотеки Caffeine, имеет ограничение по времени и ограничение по размеру количества открытых соединений WebSocket, а при завершении сессии по таймауту посылает в Kafka событие:
val clients = Caffeine.newBuilder()
.maximumSize(Beans.getProperty(Constants.GATEWAY_CACHE_SIZE, Long::class.java, 10000))
.expireAfterAccess(
Beans.getProperty(Constants.GATEWAY_CACHE_ACCESS, Long::class.java, 1800000),
TimeUnit.MILLISECONDS
)
.removalListener { key: String?, value: WebSocketSessionChain?, cause: RemovalCause ->
if (cause.wasEvicted() && ObjectUtils.isNotEmpty(key)) {
kafkaPublisher.publishDisconnect(
UserDisconnectEvent(key, value!!.tokenHash, true)
).subscribe {
value.session.close()
info { "WebSocket disconnected by timeout with user[$key]" }
}
}
}.build<String, WebSocketSessionChain>()
События Kafka отправляются через сервис EventDriverPublisher, где представлены методы по каждому типу события – это связано с необходимостью типизации модели для сериализации, т.к. для сжатия используется библиотека Apache Avro. Соответственно, все модели Kafka предварительно генерируются плагином avro-maven-plugin.
Описание модели WebSocketEvent:
{
"namespace": "io.github.sevenparadigms.gateway.kafka.model",
"type": "record",
"name": "WebSocketEvent",
"fields": [
{
"name": "username",
"type": {
"type": "string",
"default": "null"
}
},
{
"name": "baseUrl",
"type": "string"
},
{
"name": "uri",
"type": "string"
},
{
"name": "body",
"type": "string"
}
]
}
Реализация методов класса EventDrivenPublisher:
private val producerProps: Map<String, Any> = mapOf(
BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
KEY_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
VALUE_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
AUTO_REGISTER_SCHEMAS to true
)
fun <T> publish(topic: String, event: T, key: String = UUID.randomUUID().toString()) =
KafkaSender.create<String, T>(SenderOptions.create(producerProps)).createOutbound()
.send(Mono.just(ProducerRecord(topic, key, event)))
.then()
.doOnSuccess { info { "Successfully sent to topic[$topic]: $event with id=$key" } }
fun publishConnect(event: UserConnectEvent) = publish(kafkaProperties.userConnectTopic, event)
fun publishDisconnect(event: UserDisconnectEvent): Mono<Void> {
eventPublisher.publishEvent(RevokeTokenEvent(hash = event.hash, source = event.username))
return publish(kafkaProperties.userDisconnectTopic, event)
}
Конфигурация Kafka в application.yml:
kafka:
web-socket-topic: websocket-transport
user-connect-topic: user-connect-event
user-disconnect-topic: user-disconnect-event
broker: localhost:9092
group-id: websocket-gateway
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema-registry-url: http://localhost:8081
Рассмотрим части кода точки входа для WebSocket :
val output = session.send(Flux.create {
clients.put(authToken.name, WebSocketSessionChain(session, it))
})
то видно, что соединение WebSocket через объект it = FluxSink сохраняется в кэше clients для дальнейшей отправки сообщений пользователю в браузер или в мобильное приложение в структуре MessageWrapper по его идентификатору, который представлен как name.
Mono.zip(input, output).then().doFinally { signal: SignalType ->
val sessionChain = clients.getIfPresent(authToken.name)!!
kafkaPublisher.publishDisconnect(
UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
).subscribe {
clients.invalidate(authToken.name)
sessionChain.session.close()
info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
}
}
конструкция doFinally { } позволяет перехватить момент закрытия соединения WebSocket, корректно завершить сессию и отправить событие Kafka.
В Gateway включена библиотека reactive-spring-abac-security и чтобы подключится к WebSocket необходимо передать в заголовке jwt-токен, который для производительности валидируется только по времени и по подписи публичным ключом.
spring:
security:
jwt:
public-key:
MIIDeTCCAmGgAwIBAgIEFzIFujANBgkqhkiG9w0BAQsFADBtMQswCQYDVQQGEwJG
UjEQMA4GA1UECBMHVW5rbm93bjEOMAwGA1UEBxMFUGFyaXMxFzAVBgNVBAoTDlNl
dmVucGFyYWRpZ21zMQ8wDQYDVQQLEwZnaXRodWIxEjAQBgNVBAMTCUxhbyBUc2lu
ZzAeFw0yMjA0MDMxODQyMDRaFw0zMjAzMzExODQyMDRaMG0xCzAJBgNVBAYTAkZS
MRAwDgYDVQQIEwdVbmtub3duMQ4wDAYDVQQHEwVQYXJpczEXMBUGA1UEChMOU2V2
ZW5wYXJhZGlnbXMxDzANBgNVBAsTBmdpdGh1YjESMBAGA1UEAxMJTGFvIFRzaW5n
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAjtdx8tYDDRFUpw3oJdFx
Avcho5ytRQt1PZUymRoioO28RO9mXdrhJgKXA2MFlmjnzD/yRwR/PqZcneKz7rKx
kN14HYQNxgKrUFNZwtAtePiTAcAPy4NqtVeE8pS5djQ+bIqlpnJUhYvtK1vDlMkS
KUJr/N2/sRAQcH8fQiPG5vwI+MpHjWjqjjM+ycslPWqQp2QguaqxMd4IAjL8fZnP
2LGyCZdZCRbtu3TknW+zmgVMF9hiEdtUX677cBfamnslpCUe4ACI5aziwua5GQZV
DwfaFf6kOAtKcEa7CUy3axCs82KVa3lfPW/b8ALWDllbjYLZWVwNfvR5bKFFg2tk
GQIDAQABoyEwHzAdBgNVHQ4EFgQU29M6xK0D1NAvRRE1MApZv4Qr0l8wDQYJKoZI
hvcNAQELBQADggEBADCIzI/jC+3dXnhdca2ozwH6Oq46coT61tmLnCmlpTvE352w
g/FhpujILUOIwaShKjgIiBA1u1DYrZM1m9BoZ6/YuXa9OYpCK0jamuug4Vnz0bIm
fQIQPfCMJcouwc4pCm8jAzWSo8xfTJ/yhUnqt7/NQkGuSWsHVZN9O1leKVa2xTEU
C5APTpX7Rj2+mU8c/fDzFA1m+LXYp2T3dbi3yVOTzSwRkE84sE18fdgRuvJfpmxL
W3BuVKQ9/1bzpcTK1onKw7WNqrjCoO37G+d42IeDzXMdDjyI3POYYy8g/o//sp6O
JhhMDEwt2aEAKEVlQxYzgMBn8HeUQrHSeX+ML8Q=
В Gateway также включена библиотека kuberbetes-embedded-hazelcast, которая инициализируется в Spring как CacheManager и запускает сервер Hazelcast как embedded вместе с сервисом. Уникальной особенностью данной библиотеки является возможность задания максимального размера кэша, что отсутствует в исходном Hazelcast.
Параметры кэша прописываются в application.yml:
spring.cache:
jwt.expireAfterAccess: 500 # milliseconds (exclusive)
jwt.expireAfterWrite: 1000 # milliseconds (exclusive)
jwt.maximumSize: 10000
Хеш токена кэшируется в кластере Hazelcast на уровне подов одного сервиса на время жизни токена и при повторном обращении результат валидации возвращается из кэша. Токен отзывается по завершении сессии пользователя при котором инициируется событие Spring: RevokeRokenEvent и в запись кэша токена вносится метка о деактивации токена.
Внешняя интеграция в WebSocket
Для асинхронной отправки сообщений в WebSocket через событие Kafka добавлен роутер:
@Configuration
class RoutesConfiguration(private val kafkaHandler: KafkaHandler) {
@Bean
fun route(): RouterFunction<ServerResponse> = router {
("/kafka").nest {
accept(MediaType.APPLICATION_JSON).nest {
POST("", kafkaHandler::publish)
}
}
}
}
который ожидает модель EventWrapper:
data class EventWrapper(
val topic: String = StringUtils.EMPTY,
val body: JsonNode = JsonUtils.objectNode()
)
и вызывает метод publish:
@Component
class KafkaHandler(private val kafkaPublisher: EventDrivenPublisher) {
fun publish(request: ServerRequest) = request.bodyToMono(EventWrapper::class.java)
.flatMap { kafkaPublisher.publish(it.topic, it.body.jsonToObject(WebSocketEvent::class.java)) }
.flatMap { ServerResponse.ok().build() }
.doOnError { error("Exception while trying to process event: " + it.message) }
}
после чего, в произвольный топик Kafka отправляет сообщение из body.
Если мы хотим отправить сообщение в WebSocket определённому пользователю, тогда в topic указывается значение websocket-transport, а в body описывается модель MessageWrapper:
{
"topic": "websocket-transport",
"body": {
"username": userId,
"baseUrl": "http://account-service",
"uri": "cash-out",
"body": { "value": -100 }
}
}
В классе KafkaConsumerConfiguration создана подписка на топик websocket-transport, которая перенаправляет события в WebSocket:
private val receiverOptions = ReceiverOptions.create<String, WebSocketEvent>(
mapOf(
BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
GROUP_ID_CONFIG to kafkaProperties.groupId,
KEY_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
VALUE_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
AUTO_OFFSET_RESET_CONFIG to "earliest",
ENABLE_AUTO_COMMIT_CONFIG to true,
SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
SPECIFIC_AVRO_READER_CONFIG to true
)
).commitInterval(Duration.ZERO)
.commitBatchSize(0)
.subscription(setOf(kafkaProperties.webSocketTopic))
@Bean
fun listenWebSocketEvent(webSocketFactory: WebSocketFactory) =
KafkaReceiver.create(receiverOptions)
.receive()
.concatMap { record ->
Mono.fromRunnable<Void> {
val it = record.value()
info { "Transfer kafka message to WebSocket: $it" }
webSocketFactory.get(it.username!!)?.sendMessage(it.copyTo(MessageWrapper()))
}
}.subscribe()
В качестве итога обзора небольшого, но эффективного кода можно также показать то небольшое количество подключаемых зависимостей в файле сборки pom.xml:
<spring-cloud.version>2021.0.1</spring-cloud.version>
<abac-security.version>1.4.1</abac-security.version>
<reactor-kafka.version>1.3.11</reactor-kafka.version>
<avro.version>1.11.0</avro.version>
<kafka-avro.version>7.1.1</kafka-avro.version>
<hazelcast.version>5.1.1</hazelcast.version>
<embedded-hazelcast.version>1.2.0</embedded-hazelcast.version>
Резюме
Простая идея интеграции в действующий Spring Cloud Gateway конвейера WebSocket в Restful уже существующих микросервисов и обратно – открывают множество возможностей и перспектив. В частности, отказа от инфраструктуры вокруг Push-уведомлений.
При этом, Push-уведомления имеют ряд неуправляемых проблем, такие как произвольные задержки транспорта, внеочерёдность уведомлений, переполнение пула очереди.
Кроме того, существующий фронт в браузере и в мобильных приложениях необязательно сразу переписывать под использование конвейера WebSocket, ведь Gateway продолжает свою работу по маршрутизации и балансировке.
Сначала, на фронт можно добавить асинхронную фабрику обработки входящих сообщений для реактивной адаптации интерфейса, а потом достаточно подменить используемую сетевую библиотеку на свою с тем же API, чтобы на заднем фоне упаковать запросы в обвёртку и отправить через WebSocket.
WebSocket на сегодняшний день самый популярный протокол реального времени, ставшим стандартом де-факто в мобильных приложениях, таких как финансовые инструменты, социальные сети, навигация по местоположению и конечно компьютерные игры.
Архитектура WebSocket соответствует модели управляемых событий Event-Driven, а код оптимизирован для минимизации overhead по трафику и latency при передаче данных по сети. В настоящее время широкополосных каналов Интернет – WebSocket позволяет в полной мере реализовать реактивные интерфейсы, «оживив» таким образом мобильное приложение в руках пользователя.