Running as a Netty-based server
To expose an endpoint using a Netty-based server, first add the following dependency:
// if you want to use Java 21+ Virtual Threads & direct-style:
"com.softwaremill.sttp.tapir" %% "tapir-netty-server-sync" % "1.13.20"
// if you are using Future:
"com.softwaremill.sttp.tapir" %% "tapir-netty-server" % "1.13.20"
// if you are using cats-effect:
"com.softwaremill.sttp.tapir" %% "tapir-netty-server-cats" % "1.13.20"
// if you are using zio:
"com.softwaremill.sttp.tapir" %% "tapir-netty-server-zio" % "1.13.20"
Then, use:
NettySyncServer().addEndpointsto expose direct-style server endpoints (using Virtual Threads). Streaming & WebSockets are supported with Ox Flows.NettyFutureServer().addEndpointsto exposeFuture-based server endpoints.NettyCatsServer().addEndpointsto exposeF-based server endpoints, whereFis any cats-effect supported effect. Streaming & WebSockets are supported with fs2.NettyZioServer().addEndpointsto exposeZIO-based server endpoints, whereRrepresents ZIO requirements supported effect. Streaming & WebSockets are supported with ZIO Streams.
These methods require a single, or a list of ServerEndpoints, which can be
created by adding server logic to an endpoint.
For example, using direct style:
import sttp.tapir.*
import sttp.tapir.server.netty.sync.NettySyncServer
val helloWorld = endpoint
.get
.in("hello").in(query[String]("name"))
.out(stringBody)
.handleSuccess(name => s"Hello, $name!")
NettySyncServer().addEndpoint(helloWorld).startAndWait()
Direct-style
The tapir-netty-server-sync provides a direct-style server using Netty behind
the scenes. The implementation uses Identity[T] as the “effect” type.
Identity[A] simplifies to just A, representing direct style. The module is
available only for Scala 3.
See examples labeled with Direct.
To provide server logic for an endpoint when using the -sync server, you can
use the dedicated handle... methods, and its variants. This provides better
type inference.
To learn more about handling concurrency and streaming with Ox and Flows, see
its documentation.
Configuration
The interpreters can be configured by providing an
Netty[Sync|Future|...]ServerOptions value, see server options
for details.
Some options can be configured directly using a Netty[Sync|Future|...]Server
instance, such as the host and port. Others can be passed using the
Netty[Sync|Future|...]Server (options) methods. Options may also be overridden
when adding endpoints. For example:
import sttp.tapir.server.netty.NettyConfig
import sttp.tapir.server.netty.sync.{NettySyncServer, NettySyncServerOptions}
// customising the port
NettySyncServer().port(9090).addEndpoints(???)
// customising the interceptors
NettySyncServer(NettySyncServerOptions.customiseInterceptors.serverLog(None).options)
// customise Netty config
NettySyncServer(NettyConfig.default.socketBacklog(256))
Note
Unlike other server interpreters, the Netty-based servers are by default
configured to return a 404, in case none of the given endpoints match a request.
This can be changed by using a different RejectHandler.
This is due to the fact that usually no other routes (other than generated from Tapir’s endpoints) are added to a Netty server.
Server socket configuration
NettyConfig exposes a number of configuration options which allows to
customise the server socket, such as:
request timeout
connection timeout
linger timeout
graceful shutdown timeout: when stopped e.g. using
NettySyncServerBinding.stop(), it’s ensured that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period; afterwards, all server resources are closedserver header
maximum number of connections
custom netty pipeline & low-level logging handlers
For example, to change the request timeout:
import sttp.tapir.server.netty.NettyConfig
import scala.concurrent.duration.*
val config = NettyConfig.default.requestTimeout(5.seconds)
Web sockets
tapir-netty-server-sync
In the Loom-based backend, Tapir uses Ox to
manage concurrency, and your transformation pipeline should be represented as
Flow[A] => Flow[B]. Any forks started within this function will be run under a
safely isolated internal scope. See
examples/websocket/WebSocketNettySyncServer.scala
for a full example.
Note
The pipeline transforms a source of incoming web socket messages (received from the client), into a source of outgoing web socket messages (which will be sent to the client), within some concurrency scope. Once the incoming source is done, the client has closed the connection. In that case, remember to close the outgoing source as well: otherwise the scope will leak and won’t be closed. An error will be logged if the outgoing channel is not closed within a timeout after a close frame is received.
tapir-netty-server-cats
The Cats Effects interpreter supports web sockets, with pipes of type
fs2.Pipe[F, REQ, RESP]. See web sockets for more
details.
To create a web socket endpoint, use Tapir’s out(webSocketBody) output type:
import cats.effect.kernel.Resource
import cats.effect.{IO, ResourceApp}
import cats.syntax.all.*
import fs2.Pipe
import sttp.capabilities.fs2.Fs2Streams
import sttp.tapir.*
import sttp.tapir.server.netty.cats.NettyCatsServer
import sttp.ws.WebSocketFrame
import scala.concurrent.duration.*
object WebSocketsNettyCatsServer extends ResourceApp.Forever {
// Web socket endpoint
val wsEndpoint =
endpoint.get
.in("ws")
.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](Fs2Streams[IO])
.concatenateFragmentedFrames(false) // All these options are supported by tapir-netty
.ignorePong(true)
.autoPongOnPing(true)
.decodeCloseRequests(false)
.decodeCloseResponses(false)
.autoPing(Some((10.seconds, WebSocketFrame.Ping("ping-content".getBytes))))
)
// Your processor transforming a stream of requests into a stream of responses
val pipe: Pipe[IO, String, String] = requestStream => requestStream.evalMap(str => IO.pure(str.toUpperCase))
// Alternatively, requests can be ignored and the backend can be turned into a stream emitting frames to the client:
// val pipe: Pipe[IO, String, String] = requestStream => someDataEmittingStream.concurrently(requestStream.as(()))
val wsServerEndpoint = wsEndpoint.serverLogicSuccess(_ => IO.pure(pipe))
// A regular /GET endpoint
val helloWorldEndpoint: PublicEndpoint[String, Unit, String, Any] =
endpoint.get.in("hello").in(query[String]("name")).out(stringBody)
val helloWorldServerEndpoint = helloWorldEndpoint
.serverLogicSuccess(name => IO.pure(s"Hello, $name!"))
override def run(args: List[String]) = NettyCatsServer
.io()
.flatMap { server =>
Resource
.make(
server
.port(8080)
.host("localhost")
.addEndpoints(List(wsServerEndpoint, helloWorldServerEndpoint))
.start()
)(_.stop())
.as(())
}
}
Response compression
The Netty server supports automatic HTTP response compression using gzip or deflate encoding. When enabled, the server will:
Inspect the client’s
Accept-EncodingheaderCompress responses when the client supports gzip or deflate
Add the appropriate
Content-Encodingheader to compressed responses
Compression is disabled by default. To enable it:
import sttp.tapir.server.netty.{NettyConfig, NettyCompressionConfig, NettyFutureServer}
import scala.concurrent.ExecutionContext.Implicits.global
// Enable compression with Netty's default settings
val config1 = NettyConfig.default.withCompressionEnabled
// Or use the compression config explicitly
val config2 = NettyConfig.default.compressionConfig(NettyCompressionConfig.enabled)
// Start server with compression enabled
NettyFutureServer(config1).addEndpoints(???)
When to use compression
Compression is most beneficial for:
Large text responses (JSON, XML, HTML, etc.)
Responses over slow networks
APIs with high bandwidth usage
The compression is applied automatically by Netty based on the client’s
Accept-Encoding header. All responses that match the client’s accepted
encodings will be compressed using Netty’s default compression settings.
Server Sent Events
tapir-netty-server-sync
The interpreter supports SSE (Server Sent Events).
For example, to define an endpoint that returns event stream:
import ox.flow.Flow
import sttp.model.sse.ServerSentEvent
import sttp.tapir.*
import sttp.tapir.server.netty.sync.serverSentEventsBody
import scala.concurrent.duration.*
val sseEndpoint = endpoint.get.out(serverSentEventsBody)
val sseFlow = Flow
.tick(1.second) // emit a new event every second
.take(10)
.map(_ => s"Event at ${System.currentTimeMillis()}")
.map(event => ServerSentEvent(data = Some(event)))
val sseServerEndpoint = sseEndpoint.handleSuccess(_ => sseFlow)
Domain socket support
There is possibility to use Domain socket instead of TCP for handling traffic.
import sttp.tapir.*
import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureDomainSocketBinding}
import java.nio.file.Paths
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import io.netty.channel.unix.DomainSocketAddress
val serverBinding: Future[NettyFutureDomainSocketBinding] =
NettyFutureServer().addEndpoint(
endpoint.get.in("hello").in(query[String]("name")).out(stringBody).serverLogic(name =>
Future.successful[Either[Unit, String]](Right(s"Hello, $name!")))
)
.startUsingDomainSocket(Paths.get(System.getProperty("java.io.tmpdir"), "hello"))
Logging
By default, logging of handled requests and exceptions is enabled, and uses an slf4j logger.