Scala lernen: Teil 4 - WebSocket



Hallo Habr! Diesmal habe ich versucht, einen einfachen Chat über Websockets zu führen. Für Details willkommen unter Katze.



Inhalt





Links



  1. Quellcodes
  2. Docker-Bilder
  3. Tapir
  4. HTTP4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck


Tatsächlich befindet sich der gesamte Code in einem ChatHub-Objekt



class ChatHub[F[_]] private(
                             val topic: Topic[F, WebSocketFrame],
                             private val ref: Ref[F, Int]
                           )
                           (
                             implicit concurrent: Concurrent[F],
                             timer: Timer[F]
                           ) extends Http4sDsl[F] {

  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )
    //    . 
    .serverLogic(_ => IO(Left(()): Either[Unit, String]))

  def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

  private def logic(): F[Response[F]] = {
    val toClient: Stream[F, WebSocketFrame] =
      topic.subscribe(1000)
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
      handle
    WebSocketBuilder[F].build(toClient, fromClient)
  }

  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
    .through(topic.publish)
}

object ChatHub {

  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
    ref <- Ref.of[F, Int](0)
    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
  } yield new ChatHub(topic, ref)
}


Hier müssen Sie sofort über Topic sprechen - ein Synchronisationsprimitiv aus Fs2, mit dem Sie ein Publisher-Subscriber-Modell erstellen können, und Sie können viele Publisher und viele Subscriber gleichzeitig haben. Im Allgemeinen ist es besser, Nachrichten über einen Puffer wie "Warteschlange" an ihn zu senden, da die Anzahl der Nachrichten in der Warteschlange begrenzt ist und der Publisher wartet, bis alle Abonnenten Nachrichten in seiner Nachrichtenwarteschlange empfangen.



val topic: Topic[F, WebSocketFrame],


Hier zähle ich auch die Anzahl der Nachrichten, die an den Chat gesendet wurden, als die Anzahl jeder Nachricht. Da ich dies von verschiedenen Threads aus tun muss, habe ich ein Analogon von Atomic verwendet, das hier Ref heißt und die Atomizität der Operation garantiert.



  private val ref: Ref[F, Int]


Verarbeiten eines Nachrichtenstroms von Benutzern.



  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] = 
    stream
//       . 
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
//               .
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//     
    .through(topic.publish)


Eigentlich die Logik, einen Socket zu erstellen.



private def logic(): F[Response[F]] = {
//    .
    val toClient: Stream[F, WebSocketFrame] =
//        
      topic.subscribe(1000)
//        
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
//      
      handle
//         .
    WebSocketBuilder[F].build(toClient, fromClient)
  }


Wir binden unseren Socket an die Route auf dem Server (ws: // localhost: 8080 / chat)



def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }


Eigentlich ist das alles. Dann können Sie den Server mit dieser Route starten. Ich wollte immer noch irgendeine Art von Dokumentation machen. Im Allgemeinen gibt es für die Dokumentation von WebSocket und anderen ereignisbasierten Interaktionen wie RabbitMQ AMPQ AsynAPI, aber unter Tapir gibt es nichts. Daher habe ich den Endpunkt für Swagger nur als GET-Anforderung beschrieben. Natürlich wird er nicht arbeiten. Genauer gesagt wird ein 501-Fehler zurückgegeben, der jedoch in Swagger angezeigt wird



  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )


In der Prahlerei selbst sieht es so aus. Verbinden Sie







unseren Chat mit unserem API-Server



    todosController = new TodosController()
    imagesController = new ImagesController()
//   
    chatHub <- Resource.liftF(ChatHub[IO]())
    endpoints = todosController.endpoints ::: imagesController.endpoints
//     Swagger
    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
    yml: String = docs.toYaml
//      
    routes = chatHub.routeWs <+>
      endpoints.toRoutes <+>
      new SwaggerHttp4s(yml, "swagger").routes[IO]
    httpApp = Router(
      "/" -> routes
    ).orNotFound
    blazeServer <- BlazeServerBuilder[IO](serverEc)
      .bindHttp(settings.host.port, settings.host.host)
      .withHttpApp(httpApp)
      .resource


Wir verbinden uns mit dem Chat mit einem extrem einfachen Skript.



    <script>
        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
        const webSocket = new WebSocket('ws://localhost:8080/chat');

        webSocket.onopen = event => {
            alert('onopen ');
        };

        webSocket.onmessage = event => {
            console.log(event);
            receive(event.data);
        };

        webSocket.onclose = event => {
            alert('onclose ');
        };

        function send() {
            let text = document.getElementById("message");
            webSocket.send(`    Id  ${id}: ${text.value}`);
            text.value = '';
        }

        function receive(m) {
            let text = document.getElementById("chat");
            text.value = text.value + '\n\r' + m;
        }
    </script>


Das ist eigentlich alles. Ich hoffe, jemand, der auch den Rock studiert, wird an diesem Artikel interessiert und vielleicht sogar nützlich sein.



All Articles