数据缓冲区和编解码器
Java NIO 提供了许多库在顶部构建了自己的字节缓冲区 API,
特别是对于重用缓冲区和/或使用直接缓冲区的网络操作
有利于性能。例如,Netty 具有层次结构,Undertow 使用
XNIO,Jetty 使用池化字节缓冲区和要释放的回调,依此类推。
该模块提供了一组抽象来处理各种字节缓冲区
API 如下:ByteBuffer
ByteBuf
spring-core
-
DataBufferFactory
抽象化了数据缓冲区的创建。 -
DataBuffer
表示一个字节缓冲区,它可以被池化。 -
DataBufferUtils
为数据缓冲区提供了 Util 方法。 -
编解码器将数据缓冲区流解码或编码为更高级别的对象。
DataBufferFactory
DataBufferFactory
用于通过以下两种方式之一创建数据缓冲区:
-
分配新的数据缓冲区,可以选择预先指定容量(如果已知),即 即使 的实现可以按需增长和收缩,也更高效。
DataBuffer
-
包装现有的 或 ,它用 一个实现,并且不涉及分配。
byte[]
java.nio.ByteBuffer
DataBuffer
请注意,WebFlux 应用程序不会直接创建一个
通过客户端上的 或 访问它。
工厂的类型取决于底层客户端或服务器,例如,对于 Reactor Netty,对于其他客户端或服务器。DataBufferFactory
ServerHttpResponse
ClientHttpRequest
NettyDataBufferFactory
DefaultDataBufferFactory
DataBuffer
该界面提供与 but also 类似的操作
带来一些额外的好处,其中一些是受到 Netty 的启发。
以下是部分好处列表:DataBuffer
java.nio.ByteBuffer
ByteBuf
-
使用独立位置进行读写,即不需要调用 to 在 read 和 write 之间交替。
flip()
-
容量按需扩展,如 。
java.lang.StringBuilder
-
通过
PooledDataBuffer
进行池化缓冲区和引用计数。 -
以 、 或 .
java.nio.ByteBuffer
InputStream
OutputStream
-
确定给定字节的索引或最后一个索引。
PooledDataBuffer
如 ByteBuffer 的 Javadoc 中所述, 字节缓冲区可以是 direct 或 non-direct。直接缓冲区可能位于 Java 堆之外 这样就无需复制本机 I/O 操作。这使得直接缓冲区 对于通过 socket 接收和发送数据特别有用,但它们也更多 创建和发布成本高昂,这导致了池化缓冲区的想法。
PooledDataBuffer
是 的扩展,它有助于进行引用计数
对于字节缓冲池至关重要。它是如何工作的?当 为
allocated 的引用计数为 1。调用 Count,而
调用来递减它。只要计数大于 0,缓冲区
保证不会被释放。当计数减少到 0 时,池化缓冲区可以是
released,这实际上可能意味着缓冲区的预留内存将返回到
内存池。DataBuffer
PooledDataBuffer
retain()
release()
请注意,在大多数情况下,与其直接操作 on 更好
使用其中的便捷方法,仅当 a 是 的实例时,才将 release 或 retain 应用于 。PooledDataBuffer
DataBufferUtils
DataBuffer
PooledDataBuffer
DataBufferUtils
DataBufferUtils
提供了许多 Utility 方法来操作数据缓冲区:
-
将数据缓冲区流加入单个缓冲区中,可能具有零拷贝,例如,通过 复合缓冲区,如果底层字节缓冲区 API 支持的话。
-
将 或 NIO 转换为 ,反之亦然 a 变为 或 NIO 。
InputStream
Channel
Flux<DataBuffer>
Publisher<DataBuffer>
OutputStream
Channel
-
如果缓冲区是 的实例,则释放或保留 的方法。
DataBuffer
PooledDataBuffer
-
跳过或获取字节流,直到达到特定的字节计数。
Codec
该软件包提供以下策略接口:org.springframework.core.codec
-
Encoder
编码为数据缓冲区流。Publisher<T>
-
Decoder
解码为更高级别对象的流。Publisher<DataBuffer>
该模块提供 、 、 、 以及 编码器 和 解码器 实现。该模块添加了 Jackson JSON、
Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器。请参阅 WebFlux 部分中的编解码器。spring-core
byte[]
ByteBuffer
DataBuffer
Resource
String
spring-web
用DataBuffer
使用数据缓冲区时,必须特别小心以确保释放缓冲区 因为它们可能是共用的。我们将使用编解码器来说明 这是如何运作的,但概念更普遍地适用。让我们看看编解码器必须做什么 内部管理数据缓冲区。
A 是在创建更高级别之前最后读取 input data buffers 的
对象,因此它必须按如下方式释放它们:Decoder
-
如果 a 只是读取每个输入缓冲区并准备好 立即释放它,它可以通过 来实现。
Decoder
DataBufferUtils.release(dataBuffer)
-
如果 a 正在使用 或 运算符,例如 、 、 和 其他在内部预取和缓存数据项,或者使用运算符(如 、 、 )和其他省略项的运算符,则必须添加到 组合链来确保此类缓冲区在被丢弃之前被释放(可能 也是由于错误或取消信号的结果。
Decoder
Flux
Mono
flatMap
reduce
filter
skip
doOnDiscard(DataBuffer.class, DataBufferUtils::release)
-
如果 a 以任何其他方式保留一个或多个数据缓冲区,则它必须 确保在完全读取时释放它们,或者在出现错误或取消信号时释放它们 在读取和释放缓存的数据缓冲区之前进行。
Decoder
请注意,提供了一种安全有效的方法来聚合数据
buffer 流传输到单个数据缓冲区中。同样,和 是解码器可以使用的其他安全方法。DataBufferUtils#join
skipUntilByteCount
takeUntilByteCount
An 分配其他人必须读取 (和释放) 的数据缓冲区。所以 an 没有太多事情要做。但是,如果
使用数据填充缓冲区时发生序列化错误。例如:Encoder
Encoder
Encoder
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
的使用者负责释放它接收的数据缓冲区。
在 WebFlux 应用程序中,输出用于写入 HTTP 服务器
响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区是
负责将代码写入服务器响应或客户端请求。Encoder
Encoder
请注意,在 Netty 上运行时,有一些调试选项可用于排查缓冲区泄漏问题。