Изучаю Scala: Часть 4 — WebSocket

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

Привет, Хабр! На этот раз я по пробовал сделать простенький чат через ВебСокеты. За подробностями добро пожаловать под кат.

Содержание


  • Изучаю Scala: Часть 1 — Игра змейка
  • Изучаю Scala: Часть 2 — Todo лист с возможностью загрузки картинок
  • Изучаю Scala: Часть 3 — Юнит Тесты
  • Изучаю Scala: Часть 4 — WebSocket

Ссылки


  1. Исходники
  2. Образы docker image
  3. Tapir
  4. Http4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck


Собственно весь код находиться в одном объект ChatHub
class ChatHub[F[_]] private(
                             val topic: Topic[F, WebSocketFrame],
                             private val ref: Ref[F, Int]
                           )
                           (
                             implicit concurrent: Concurrent[F],
                             timer: Timer[F]
                           ) extends Http4sDsl[F] {

  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")
    .description("Подключает к общему чату")
    .in(
      stringBody
        .description("Сообщение которое будет отправлено пользователям в чате")
        .example("Привет!")
    )
    .out(
      stringBody
        .description("Сообщение которое кто-то написал в чат")
        .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")
    )
    //Заглушка которая всегда отвечает ошибкой. 
    .serverLogic(_ => IO(Left(()): Either[Unit, String]))

  def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

  private def logic(): F[Response[F]] = {
    val toClient: Stream[F, WebSocketFrame] =
      topic.subscribe(1000)
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
      handle
    WebSocketBuilder[F].build(toClient, fromClient)
  }

  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
    .through(topic.publish)
}

object ChatHub {

  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
    ref <- Ref.of[F, Int](0)
    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
  } yield new ChatHub(topic, ref)
}

Тут надо сразу сказать про Topic — примитив синхронизации из Fs2 который позволяет сделать модель Publisher — Subscriber причем у вас может быть много Publisher и одновременно много Subscriber. Вообще в него лучшее отправлять сообщения через какой-то буфер вроде Queue потому что у него есть ограничения на количество сообщения в очереди и Publisher ждет пока все Subscriber не получат сообщения в свою очередь сообщений и если она переполнена то может и зависнуть.
val topic: Topic[F, WebSocketFrame],

Тут еще я считаю количество сообщений которые были переданы в чат как номер каждого сообщения. Так как это мне нужно делать из разных потоков я использовал аналог Atomic который тут называется Ref и гарантирует атомарность операции.
  private val ref: Ref[F, Int]

Обработка потока сообщений от пользователей.
  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] = 
    stream
//Достаем из фрейма текстовое сообщение и фильтруем фреймы. 
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
//Атомарно увеличиваем наш счетчик с сохранением нового значения и добавления его значения к тексту сообщения пользователя.
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//Каждое пришедшее сообщение отправляем в топик
    .through(topic.publish)

Собственно сама логика создания сокета.
private def logic(): F[Response[F]] = {
//Откуда получать данные для клиента.
    val toClient: Stream[F, WebSocketFrame] =
//Просто подписываемся на данные которые будут приходить в топик
      topic.subscribe(1000)
//Что будем делать с данными которые приходить от клиента
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
//Просто отправляем данные в топик после обработки
      handle
//Создаем веб сокет с созданными ранее генератором и потребителем данных.
    WebSocketBuilder[F].build(toClient, fromClient)
  }

Связываем наш сокет с роутом на сервере (ws://localhost:8080/chat)
def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

Собственно на этом все. Дальше уже можно запускать сервер с этим роутом. Мне еще захотелось какую ни какую документацию сделать. Вообще для документирования WebSocket и прочего основанного на событиях взаимодействия вроде RabbitMQ AMPQ есть AsynAPI но под Tapir там нет ничего поэтому просто сделал для Swagger описание эндпойнта как GET запрос. Работать он конечно не будет. Точнее 501 ошибку будет возвращать зато будет отображаться в Swagger
  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")
    .description("Подключает к общему чату")
    .in(
      stringBody
        .description("Сообщение которое будет отправлено пользователям в чате")
        .example("Привет!")
    )
    .out(
      stringBody
        .description("Сообщение которое кто-то написал в чат")
        .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")
    )

В самом сваггере это выглядит вот так

Подключаем наш чат к нашему серверу API
    todosController = new TodosController()
    imagesController = new ImagesController()
//Создаем объект нашего чата
    chatHub <- Resource.liftF(ChatHub[IO]())
    endpoints = todosController.endpoints ::: imagesController.endpoints
//Добавляем его эндпойнт в документацию Swagger
    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
    yml: String = docs.toYaml
//Добавляем его маршрут в список маршрутов приложения
    routes = chatHub.routeWs <+>
      endpoints.toRoutes <+>
      new SwaggerHttp4s(yml, "swagger").routes[IO]
    httpApp = Router(
      "/" -> routes
    ).orNotFound
    blazeServer <- BlazeServerBuilder[IO](serverEc)
      .bindHttp(settings.host.port, settings.host.host)
      .withHttpApp(httpApp)
      .resource

Подключаемся к чату крайне простым скриптом.
    <script>
        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
        const webSocket = new WebSocket('ws://localhost:8080/chat');

        webSocket.onopen = event => {
            alert('onopen ');
        };

        webSocket.onmessage = event => {
            console.log(event);
            receive(event.data);
        };

        webSocket.onclose = event => {
            alert('onclose ');
        };

        function send() {
            let text = document.getElementById("message");
            webSocket.send(`Сообщение от клиента с Id подключения ${id}: ${text.value}`);
            text.value = '';
        }

        function receive(m) {
            let text = document.getElementById("chat");
            text.value = text.value + '\n\r' + m;
        }
    </script>

На этом собственно все. Надеюсь кому-то кто тоже изучает скала будет интересна эта статья а может даже полезна.
Источник: https://habr.com/ru/post/517076/


Интересные статьи

Интересные статьи

Разбираясь с современными методами организации multicast VPN я заметил, что в сети не так много материала, описывающего принципы и детали работы технологий. На сайте вендора представлена ...
Приветствую, дорогие любители и профессионалы, программисты графики! Приступаем ко второй части нашего цикла статей про оптимизацию рендера под Mobile. В этой части мы будем рассматривать основны...
У-у-у… Кто обитает на дне океана? Cubicity — квадратная игра. Привет всем, кто ждал продолжения первой части нашей статьи. Настало время подвести итоги релиза Cubicity. Это был долгий путь с д...
Если у вас есть интернет-магазин и вы принимаете платежи через Интернет, то с 01 июля 2017 года у вас есть онлайн-касса.
Где-то в 1989-90 году я стал более-менее регулярно играть на РС, а не время от времени. Нас на выходные брали на работу и почти целый день у нас на троих в распоряжении имелось два компьютера с и...