Hallo Habr! Diesmal habe ich versucht, einen einfachen Chat über Websockets zu führen. Für Details willkommen unter Katze.
Inhalt
- Scala lernen: Teil 1 - Schlangenspiel
- Lernskala: Teil 2 - Todo Sheet mit Bild-Uploads
- Lernskala: Teil 3 - Unit Tests
- Scala lernen: Teil 4 - WebSocket
Links
Tatsächlich befindet sich der gesamte Code in einem ChatHub-Objekt
class ChatHub[F[_]] private(
val topic: Topic[F, WebSocketFrame],
private val ref: Ref[F, Int]
)
(
implicit concurrent: Concurrent[F],
timer: Timer[F]
) extends Http4sDsl[F] {
val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary(" . : ws://localhost:8080/chat")
.description(" ")
.in(
stringBody
.description(" ")
.example("!")
)
.out(
stringBody
.description(" - ")
.example("6 : Id f518a53d: !")
)
// .
.serverLogic(_ => IO(Left(()): Either[Unit, String]))
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
private def logic(): F[Response[F]] = {
val toClient: Stream[F, WebSocketFrame] =
topic.subscribe(1000)
val fromClient: Pipe[F, WebSocketFrame, Unit] =
handle
WebSocketBuilder[F].build(toClient, fromClient)
}
private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
.collect({
case WebSocketFrame.Text(text, _) => text
})
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
.through(topic.publish)
}
object ChatHub {
def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
ref <- Ref.of[F, Int](0)
topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
} yield new ChatHub(topic, ref)
}
Hier müssen Sie sofort über Topic sprechen - ein Synchronisationsprimitiv aus Fs2, mit dem Sie ein Publisher-Subscriber-Modell erstellen können, und Sie können viele Publisher und viele Subscriber gleichzeitig haben. Im Allgemeinen ist es besser, Nachrichten über einen Puffer wie "Warteschlange" an ihn zu senden, da die Anzahl der Nachrichten in der Warteschlange begrenzt ist und der Publisher wartet, bis alle Abonnenten Nachrichten in seiner Nachrichtenwarteschlange empfangen.
val topic: Topic[F, WebSocketFrame],
Hier zähle ich auch die Anzahl der Nachrichten, die an den Chat gesendet wurden, als die Anzahl jeder Nachricht. Da ich dies von verschiedenen Threads aus tun muss, habe ich ein Analogon von Atomic verwendet, das hier Ref heißt und die Atomizität der Operation garantiert.
private val ref: Ref[F, Int]
Verarbeiten eines Nachrichtenstroms von Benutzern.
private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] =
stream
// .
.collect({
case WebSocketFrame.Text(text, _) => text
})
// .
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//
.through(topic.publish)
Eigentlich die Logik, einen Socket zu erstellen.
private def logic(): F[Response[F]] = {
// .
val toClient: Stream[F, WebSocketFrame] =
//
topic.subscribe(1000)
//
val fromClient: Pipe[F, WebSocketFrame, Unit] =
//
handle
// .
WebSocketBuilder[F].build(toClient, fromClient)
}
Wir binden unseren Socket an die Route auf dem Server (ws: // localhost: 8080 / chat)
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
Eigentlich ist das alles. Dann können Sie den Server mit dieser Route starten. Ich wollte immer noch irgendeine Art von Dokumentation machen. Im Allgemeinen gibt es für die Dokumentation von WebSocket und anderen ereignisbasierten Interaktionen wie RabbitMQ AMPQ AsynAPI, aber unter Tapir gibt es nichts. Daher habe ich den Endpunkt für Swagger nur als GET-Anforderung beschrieben. Natürlich wird er nicht arbeiten. Genauer gesagt wird ein 501-Fehler zurückgegeben, der jedoch in Swagger angezeigt wird
val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary(" . : ws://localhost:8080/chat")
.description(" ")
.in(
stringBody
.description(" ")
.example("!")
)
.out(
stringBody
.description(" - ")
.example("6 : Id f518a53d: !")
)
In der Prahlerei selbst sieht es so aus. Verbinden Sie
unseren Chat mit unserem API-Server
todosController = new TodosController()
imagesController = new ImagesController()
//
chatHub <- Resource.liftF(ChatHub[IO]())
endpoints = todosController.endpoints ::: imagesController.endpoints
// Swagger
docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
yml: String = docs.toYaml
//
routes = chatHub.routeWs <+>
endpoints.toRoutes <+>
new SwaggerHttp4s(yml, "swagger").routes[IO]
httpApp = Router(
"/" -> routes
).orNotFound
blazeServer <- BlazeServerBuilder[IO](serverEc)
.bindHttp(settings.host.port, settings.host.host)
.withHttpApp(httpApp)
.resource
Wir verbinden uns mit dem Chat mit einem extrem einfachen Skript.
<script>
const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
const webSocket = new WebSocket('ws://localhost:8080/chat');
webSocket.onopen = event => {
alert('onopen ');
};
webSocket.onmessage = event => {
console.log(event);
receive(event.data);
};
webSocket.onclose = event => {
alert('onclose ');
};
function send() {
let text = document.getElementById("message");
webSocket.send(` Id ${id}: ${text.value}`);
text.value = '';
}
function receive(m) {
let text = document.getElementById("chat");
text.value = text.value + '\n\r' + m;
}
</script>
Das ist eigentlich alles. Ich hoffe, jemand, der auch den Rock studiert, wird an diesem Artikel interessiert und vielleicht sogar nützlich sein.