Reactive Spring Cloud Gateway: конвейер WebSocket в Restful и обратно

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

В предыдущей статье Reactive Spring ABAC Security: безопасность уровня Enterprise впервые и крайне кратко представлен описанный в данной статье подход. Пришло время детально раскрыть нюансы и глубину потенциала нового подхода, когда основным протоколом для взаимодействия браузера и мобильных приложений с сервисами Spring Cloud выступает WebSocket.

Аннотация

Spring Cloud Gateway официально служит следующим целям: слоем безопасности, маршрутизатором, балансировщиком, провайдером протоколов сжатия. Gateway ничего не знает о передаваемых данных и тем более о бизнес-логике – это инфраструктурный сервис.

Связь Gateway с WebSocket обуславливает здравый смысл. Если перенести логику WebSocket в Gateway, то при высоких нагрузках потребуется ненамного увеличить количество подов с одной стороны, а с другой если через WebSocket работают не только браузеры, но и мобильные приложения – то такой подход становится естественным.

В статье описываются преимущества данного подхода, детально перечисляются нюансы реализации и определяются общие перспективы дальнейшего развития. Но основная идея сводится к уменьшению стоимости разработки за счёт переиспользования базовых технологий для всего необходимого функционала.

Введение

В эпоху рассвета мобильных приложений на Android, в то время, когда в 2009 году Samsung объявил об отказе от Symbian и переходе на Android (iOS ещё не было) – WebSocket в том же году был ещё в виде черновика, т.е. исторически мобильные приложения не застали эту технологию и развитие ушло своим чередом.

Кроме того, существовала проблема при переключении телефона с одной соты на другую– также менялся и IP-адрес телефона, что прерывало все установленные соединения. Данный фактор был не в пользу WebSocket, который в исходной версиине умел переподключаться при обрыве связи. Сейчас, качество связи стала очень стабильной, а IP-адреса внутри сети мобильного оператора привязаны к телефону. Можно сказать появились тепличные условия для восхождения WebSocket и почему бы и нет.

В представленном проекте Spring Cloud WebSocket Gateway, интеграция WebSocket не повлияла на работу и исходный код существующих сервисов, т.к. конвейер сообщений WebSocket в Restful прозрачно конвертирует запросы и ответы.

Преимущества использования конвейера WebSocket в Restful следующие:

  1. Классический микросервисный подход Spring Cloud к организации обработки потока данных на стороне бэкенда как от браузера так и от мобильных приложений по WebSocket, когда сервисам не интересно откуда приходят http-запросы;

  2. Единое понятие сессии: отключение всех соединений WebSocket пользователя однозначно сообщит о закрытии сессии как в браузере, так и в мобильном приложении (такое знание для многих бизнес-процессов критично);

  3. Открытые соединения WebSocket точно определяют количество пользователей онлайн, в то время как в случае http-запросов онлайн считается по количеству запросов за последние полчаса;

  4. Более не требуется тратить значительные ресурсы на организацию Push-уведомлений – полученные через WebSocket уведомлений ничем не отличаются визуально, при условии когда приложение запущено (что в подавляющем большинстве случаев приемлемо);

  5. При высоких нагрузках WebSocket заметно быстрее HTTP и значительно меньше потребляет трафика, при этом, поддерживает полнодуплексный асинхронный обмен сообщениями в обе стороны;

  6. Из коробки Origin-based cross-domain policy (политика безопасности на основе происхождения);

  7. Прямым преимуществом использования WebSocket перед http-запросами в возможности обратной связи в реальном времени, что позволяет строить современные высококонкурентные реактивные интерфейсы в мобильных приложениях.

Если принять во внимание факт увеличения расхода батареи из-за постоянно открытого соединения WebSocket, то согласно GSM Arena с 2010 года плотность аккумуляторов к 2020 году выросла в 3 раза и при том же форм-факторе ёмкость батареи достигла 5000 мАч.

А если ещё учесть время на полную зарядку современных аккумуляторов от сети в 15 минут, то при таких темпах развития данный факт можно смело отнести к незначительным.

Конечно, Gateway продолжит маршрутизацию и балансировку http-запросов, т.к. останутся: авторизация пользователя, запросы выдающие большие объёмы данных, выгрузка файлов в браузер, а также интеграция внешних систем.

Рассматриваемый в статье проект spring-cloud-websocket-gateway позволяет снизить стоимость разработки и владения за счёт использования классического подхода http-запросов при работе с браузером и мобильными приложениями по протоколу WebSocket, а также эффективной утилизации сессий в момент окончания жизни jwt-токена. Представленный исходный код несмотря на небольшой размер и простоту реализации – эффективно выполняет все описанные в статье возможности.

Поддержка WebSocket в Android и iOS

В 2013 году один из разработчиков проекта Socket.io Naoyuki Kanezawa закомитил клиент WebSocket под Android, которая также активно дорабатывается до настоящего времени. А спустя 4 года появился клиент под iOS также с полной поддержкой WebSocket.

В 2014 году вышла в релиз спецификация JSR 356, поддержка которой привела к включению в Java Development Kit (JDK) пакета javax.websocket. В дальнейшем, непосредственно в Kotlin Standard Library была внедрена обвёртка вокруг DOM API с базовым классом WebSocket – который также можно использовать в Android приложениях.

Помимо нативных библиотек от команды Socket.io, существует более десятка сторонних с полной имплементацией протокола WebSocket по спецификации RFC 6455 и большинство из них активно поддерживаются довольно крупными сообществами.

Наиболее распространённая практика использования WebSocket в среде iOS – это библиотека SocketRocket от Facebook, а в среде Android – библиотека Java-WebSocket от opensource разработчика Nathan Rajlich, создавший проект в 2010 году (скорее всего для нужд одного из клиентов фондовой биржи NASDAQ).

WebSocket профессионально присутствует в мобильных приложениях более 10 лет, т.к. на глобальном уровне требование к доставке данных в реальном режиме времени постоянно расширяется с каждым новым типом данных из года в год.

Основные аспекты реализации

Браузер или мобильное приложение отправляет и принимает запросы по WebSocket в обвёртке:

data class MessageWrapper(
    val type: HttpMethod = HttpMethod.GET,
    val baseUrl: String = StringUtils.EMPTY,
    val uri: String = StringUtils.EMPTY,
    val body: JsonNode? = null
)

Атрибуты образуют http-запрос к сервисам:

  • type – тип GET, HEAD, POST, PUT, PATCH, DELETE, TRACE (по умолчанию GET)

  • baseUrl – имя сервиса, например: http://account-service

  • uri – метод контроллера и query-запрос, например: findAllPaged?sort=id:desc

  • body – json запроса при типе запроса POST, PUT или PATCH

при формировании ответа body заменяется ответом с сервиса: message.copy(body = it), а baseUrl и uri служат идентификаторами запроса в обработчике WebSocket в браузере или в мобильном приложении.

Функция преобразования запросов WebSocket в Restful и обратно:

fun handling(message: MessageWrapper, username: String) {
    val webClient = Beans.of(WebClient.Builder::class.java).baseUrl(message.baseUrl).build()
    val response = when (message.type) {
        HttpMethod.GET -> webClient.get().uri(message.uri).retrieve()
        HttpMethod.POST -> webClient.post().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
        HttpMethod.PUT -> webClient.put().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
        HttpMethod.DELETE -> webClient.delete().uri(message.uri).retrieve()
        HttpMethod.PATCH -> webClient.patch().uri(message.uri).body(BodyInserters.fromValue(message.body)).retrieve()
        HttpMethod.HEAD -> webClient.head().uri(message.uri).retrieve()
        HttpMethod.OPTIONS -> webClient.options().uri(message.uri).retrieve()
        HttpMethod.TRACE -> webClient.method(HttpMethod.TRACE).uri(message.uri).retrieve()
    }
    response
        .onStatus({ status -> status.isError })
        { clientResponse ->
            clientResponse.bodyToMono(ByteArrayResource::class.java)
                .map { responseAnswer: ByteArrayResource ->
                    WebClientResponseException(
                        clientResponse.rawStatusCode(),
                        clientResponse.statusCode().name,
                        clientResponse.headers().asHttpHeaders(),
                        responseAnswer.byteArray,
                        Charsets.UTF_8
                    )
                }
        }
        .bodyToMono(JsonNode::class.java).subscribe {
            info { "Request[${message.baseUrl}${message.uri}] by user[$username] accepted" }
            debug { it.toString() }
            val sessionChain = clients.getIfPresent(username)
            sessionChain?.sendMessage(message.copy(body = it))
        }
}

Детальный обзор реализации

Рассмотрим публикацию точки входа для реактивного WebSocket для конвертации запросов из WebSocket в HTTP к сервисам и обратно:

@Component
@WebSocketEntryPoint("/wsf")
class WebSocketFactory(val kafkaPublisher: EventDrivenPublisher) : WebSocketHandler {
    override fun handle(session: WebSocketSession) = session.handshakeInfo.principal
    .cast(UsernamePasswordAuthenticationToken::class.java)
    .flatMap { authToken: UsernamePasswordAuthenticationToken ->
        val output = session.send(Flux.create {
            authToken.credentials
            clients.put(authToken.name, WebSocketSessionChain(
                session = session, tokenHash = authToken.credentials as Long, chain = it))
        })
        val input = session.receive()
            .map { obj: WebSocketMessage -> obj.payloadAsText.parseJson(MessageWrapper::class.java) }
            .doOnNext { handling(it, authToken.name) }.then()

        Mono.zip(input, output).then().doFinally { signal: SignalType ->
            val sessionChain = clients.getIfPresent(authToken.name)!!
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
            ).subscribe {
                clients.invalidate(authToken.name)
                sessionChain.session.close()
                info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
            }
        }
        kafkaPublisher.publishConnect(
            UserConnectEvent(authToken.name, authToken.authorities.map { it.authority })
        )
    }

Кэш clients на базе библиотеки Caffeine, имеет ограничение по времени и ограничение по размеру количества открытых соединений WebSocket, а при завершении сессии по таймауту посылает в Kafka событие:

val clients = Caffeine.newBuilder()
    .maximumSize(Beans.getProperty(Constants.GATEWAY_CACHE_SIZE, Long::class.java, 10000))
    .expireAfterAccess(
        Beans.getProperty(Constants.GATEWAY_CACHE_ACCESS, Long::class.java, 1800000),
        TimeUnit.MILLISECONDS
    )
    .removalListener { key: String?, value: WebSocketSessionChain?, cause: RemovalCause ->
        if (cause.wasEvicted() && ObjectUtils.isNotEmpty(key)) {
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(key, value!!.tokenHash, true)
            ).subscribe {
                value.session.close()
                info { "WebSocket disconnected by timeout with user[$key]" }
            }
        }
    }.build<String, WebSocketSessionChain>()

События Kafka отправляются через сервис EventDriverPublisher, где представлены методы по каждому типу события – это связано с необходимостью типизации модели для сериализации, т.к. для сжатия используется библиотека Apache Avro. Соответственно, все модели Kafka предварительно генерируются плагином avro-maven-plugin.

Описание модели WebSocketEvent:

{
  "namespace": "io.github.sevenparadigms.gateway.kafka.model",
  "type": "record",
  "name": "WebSocketEvent",
  "fields": [
    {
      "name": "username",
      "type": {
        "type": "string",
        "default": "null"
      }
    },
    {
      "name": "baseUrl",
      "type": "string"
    },
    {
      "name": "uri",
      "type": "string"
    },
    {
      "name": "body",
      "type": "string"
    }
  ]
}

Реализация методов класса EventDrivenPublisher:

private val producerProps: Map<String, Any> = mapOf(
    BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
    KEY_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
    VALUE_SERIALIZER_CLASS_CONFIG to kafkaProperties.serializer,
    SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
    VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
    AUTO_REGISTER_SCHEMAS to true
)

fun <T> publish(topic: String, event: T, key: String = UUID.randomUUID().toString()) =
    KafkaSender.create<String, T>(SenderOptions.create(producerProps)).createOutbound()
        .send(Mono.just(ProducerRecord(topic, key, event)))
        .then()
        .doOnSuccess { info { "Successfully sent to topic[$topic]: $event with id=$key" }  }


fun publishConnect(event: UserConnectEvent) = publish(kafkaProperties.userConnectTopic, event)
fun publishDisconnect(event: UserDisconnectEvent): Mono<Void> {
    eventPublisher.publishEvent(RevokeTokenEvent(hash = event.hash, source = event.username))
    return publish(kafkaProperties.userDisconnectTopic, event)
}

Конфигурация Kafka в application.yml:

kafka:
  web-socket-topic: websocket-transport
  user-connect-topic: user-connect-event
  user-disconnect-topic: user-disconnect-event
  broker: localhost:9092
  group-id: websocket-gateway
  serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
  schema-registry-url: http://localhost:8081

Рассмотрим части кода точки входа для WebSocket :

val output = session.send(Flux.create {
            clients.put(authToken.name, WebSocketSessionChain(session, it))
        })

то видно, что соединение WebSocket через объект it = FluxSink сохраняется в кэше clients для дальнейшей отправки сообщений пользователю в браузер или в мобильное приложение в структуре MessageWrapper по его идентификатору, который представлен как name.

Mono.zip(input, output).then().doFinally { signal: SignalType ->
            val sessionChain = clients.getIfPresent(authToken.name)!!
            kafkaPublisher.publishDisconnect(
                UserDisconnectEvent(authToken.name, sessionChain.tokenHash, false)
            ).subscribe {
                clients.invalidate(authToken.name)
                sessionChain.session.close()
                info { "Connection close with signal[${signal.name}] and user[${authToken.name}]" }
            }
        }

конструкция doFinally { } позволяет перехватить момент закрытия соединения WebSocket, корректно завершить сессию и отправить событие Kafka.

В Gateway включена библиотека reactive-spring-abac-security и чтобы подключится к WebSocket необходимо передать в заголовке jwt-токен, который для производительности валидируется только по времени и по подписи публичным ключом.

spring:
  security:
    jwt:
      public-key:
        MIIDeTCCAmGgAwIBAgIEFzIFujANBgkqhkiG9w0BAQsFADBtMQswCQYDVQQGEwJG
        UjEQMA4GA1UECBMHVW5rbm93bjEOMAwGA1UEBxMFUGFyaXMxFzAVBgNVBAoTDlNl
        dmVucGFyYWRpZ21zMQ8wDQYDVQQLEwZnaXRodWIxEjAQBgNVBAMTCUxhbyBUc2lu
        ZzAeFw0yMjA0MDMxODQyMDRaFw0zMjAzMzExODQyMDRaMG0xCzAJBgNVBAYTAkZS
        MRAwDgYDVQQIEwdVbmtub3duMQ4wDAYDVQQHEwVQYXJpczEXMBUGA1UEChMOU2V2
        ZW5wYXJhZGlnbXMxDzANBgNVBAsTBmdpdGh1YjESMBAGA1UEAxMJTGFvIFRzaW5n
        MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAjtdx8tYDDRFUpw3oJdFx
        Avcho5ytRQt1PZUymRoioO28RO9mXdrhJgKXA2MFlmjnzD/yRwR/PqZcneKz7rKx
        kN14HYQNxgKrUFNZwtAtePiTAcAPy4NqtVeE8pS5djQ+bIqlpnJUhYvtK1vDlMkS
        KUJr/N2/sRAQcH8fQiPG5vwI+MpHjWjqjjM+ycslPWqQp2QguaqxMd4IAjL8fZnP
        2LGyCZdZCRbtu3TknW+zmgVMF9hiEdtUX677cBfamnslpCUe4ACI5aziwua5GQZV
        DwfaFf6kOAtKcEa7CUy3axCs82KVa3lfPW/b8ALWDllbjYLZWVwNfvR5bKFFg2tk
        GQIDAQABoyEwHzAdBgNVHQ4EFgQU29M6xK0D1NAvRRE1MApZv4Qr0l8wDQYJKoZI
        hvcNAQELBQADggEBADCIzI/jC+3dXnhdca2ozwH6Oq46coT61tmLnCmlpTvE352w
        g/FhpujILUOIwaShKjgIiBA1u1DYrZM1m9BoZ6/YuXa9OYpCK0jamuug4Vnz0bIm
        fQIQPfCMJcouwc4pCm8jAzWSo8xfTJ/yhUnqt7/NQkGuSWsHVZN9O1leKVa2xTEU
        C5APTpX7Rj2+mU8c/fDzFA1m+LXYp2T3dbi3yVOTzSwRkE84sE18fdgRuvJfpmxL
        W3BuVKQ9/1bzpcTK1onKw7WNqrjCoO37G+d42IeDzXMdDjyI3POYYy8g/o//sp6O
        JhhMDEwt2aEAKEVlQxYzgMBn8HeUQrHSeX+ML8Q=

В Gateway также включена библиотека kuberbetes-embedded-hazelcast, которая инициализируется в Spring как CacheManager и запускает сервер Hazelcast как embedded вместе с сервисом. Уникальной особенностью данной библиотеки является возможность задания максимального размера кэша, что отсутствует в исходном Hazelcast.

Параметры кэша прописываются в application.yml:

spring.cache:
  jwt.expireAfterAccess: 500 # milliseconds (exclusive)
  jwt.expireAfterWrite: 1000 # milliseconds (exclusive)
  jwt.maximumSize: 10000

Хеш токена кэшируется в кластере Hazelcast на уровне подов одного сервиса на время жизни токена и при повторном обращении результат валидации возвращается из кэша. Токен отзывается по завершении сессии пользователя при котором инициируется событие Spring: RevokeRokenEvent и в запись кэша токена вносится метка о деактивации токена.

Внешняя интеграция в WebSocket

Для асинхронной отправки сообщений в WebSocket через событие Kafka добавлен роутер:

@Configuration
class RoutesConfiguration(private val kafkaHandler: KafkaHandler) {
    @Bean
    fun route(): RouterFunction<ServerResponse> = router {
        ("/kafka").nest {
            accept(MediaType.APPLICATION_JSON).nest {
                POST("", kafkaHandler::publish)
            }
        }
    }
}

который ожидает модель EventWrapper:

data class EventWrapper(
    val topic: String = StringUtils.EMPTY,
    val body: JsonNode = JsonUtils.objectNode()
)

и вызывает метод publish:

@Component
class KafkaHandler(private val kafkaPublisher: EventDrivenPublisher) {
    fun publish(request: ServerRequest) = request.bodyToMono(EventWrapper::class.java)
        .flatMap { kafkaPublisher.publish(it.topic, it.body.jsonToObject(WebSocketEvent::class.java)) }
        .flatMap { ServerResponse.ok().build() }
        .doOnError { error("Exception while trying to process event: " + it.message) }
}

после чего, в произвольный топик Kafka отправляет сообщение из body.

Если мы хотим отправить сообщение в WebSocket определённому пользователю, тогда в topic указывается значение websocket-transport, а в body описывается модель MessageWrapper:

{
    "topic": "websocket-transport",
    "body": {
        "username": userId,
        "baseUrl": "http://account-service",
        "uri": "cash-out",
        "body": { "value": -100 }
    }
}

В классе KafkaConsumerConfiguration создана подписка на топик websocket-transport, которая перенаправляет события в WebSocket:

private val receiverOptions = ReceiverOptions.create<String, WebSocketEvent>(
    mapOf(
        BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.broker,
        GROUP_ID_CONFIG to kafkaProperties.groupId,
        KEY_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
        VALUE_DESERIALIZER_CLASS_CONFIG to kafkaProperties.deserializer,
        AUTO_OFFSET_RESET_CONFIG to "earliest",
        ENABLE_AUTO_COMMIT_CONFIG to true,
        SCHEMA_REGISTRY_URL_CONFIG to kafkaProperties.schemaRegistryUrl,
        VALUE_SUBJECT_NAME_STRATEGY to RecordNameStrategy::class.java,
        SPECIFIC_AVRO_READER_CONFIG to true
    )
).commitInterval(Duration.ZERO)
    .commitBatchSize(0)
    .subscription(setOf(kafkaProperties.webSocketTopic))

@Bean
fun listenWebSocketEvent(webSocketFactory: WebSocketFactory) = 
  KafkaReceiver.create(receiverOptions)
    .receive()
    .concatMap { record ->
        Mono.fromRunnable<Void> {
            val it = record.value()
            info { "Transfer kafka message to WebSocket: $it" }
            webSocketFactory.get(it.username!!)?.sendMessage(it.copyTo(MessageWrapper()))
        }
    }.subscribe()

В качестве итога обзора небольшого, но эффективного кода можно также показать то небольшое количество подключаемых зависимостей в файле сборки pom.xml:

<spring-cloud.version>2021.0.1</spring-cloud.version>
<abac-security.version>1.4.1</abac-security.version>
<reactor-kafka.version>1.3.11</reactor-kafka.version>
<avro.version>1.11.0</avro.version>
<kafka-avro.version>7.1.1</kafka-avro.version>
<hazelcast.version>5.1.1</hazelcast.version>
<embedded-hazelcast.version>1.2.0</embedded-hazelcast.version>

Резюме

Простая идея интеграции в действующий Spring Cloud Gateway конвейера WebSocket в Restful уже существующих микросервисов и обратно – открывают множество возможностей и перспектив. В частности, отказа от инфраструктуры вокруг Push-уведомлений.

При этом, Push-уведомления имеют ряд неуправляемых проблем, такие как произвольные задержки транспорта, внеочерёдность уведомлений, переполнение пула очереди.

Кроме того, существующий фронт в браузере и в мобильных приложениях необязательно сразу переписывать под использование конвейера WebSocket, ведь Gateway продолжает свою работу по маршрутизации и балансировке.

Сначала, на фронт можно добавить асинхронную фабрику обработки входящих сообщений для реактивной адаптации интерфейса, а потом достаточно подменить используемую сетевую библиотеку на свою с тем же API, чтобы на заднем фоне упаковать запросы в обвёртку и отправить через WebSocket.

WebSocket на сегодняшний день самый популярный протокол реального времени, ставшим стандартом де-факто в мобильных приложениях, таких как финансовые инструменты, социальные сети, навигация по местоположению и конечно компьютерные игры.

Архитектура WebSocket соответствует модели управляемых событий Event-Driven, а код оптимизирован для минимизации overhead по трафику и latency при передаче данных по сети. В настоящее время широкополосных каналов Интернет – WebSocket позволяет в полной мере реализовать реактивные интерфейсы, «оживив» таким образом мобильное приложение в руках пользователя.

Источник: https://habr.com/ru/post/662692/


Интересные статьи

Интересные статьи

Как работают популярные счетчики веб или мобильной аналитики, например, Google Analytics или AppsFlyer? На сайт устанавливаются их коды или в приложение интегрируется мобильное SDK. Потом при каждом д...
Весна 2020-го началась с не самых приятных новостей. Сейчас, спустя год, многие компании стремятся помогать сотрудникам в вопросах, к которым раньше не было столь пристального внимани...
Довольно распространенная задача – создание превью картинок для сайта из полноразмерных изображений. Автоматизируем этот процесс с помощью триггера для Yandex Object Stor...
В нынешний век каждый уже слышал об облачных сервисах. Многие компании освоили этот сегмент рынка и создали свои облачные сервисы самых различных направлений. Наша команда также в последнее вре...
С версии 12.0 в Bitrix Framework доступно создание резервных копий в автоматическом режиме. Задание параметров автоматического резервного копирования производится в Административной части на странице ...