@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
final ServerHttpRequest request = exchange.getRequest();
long contentLength = request.getHeaders().getContentLength();
if (contentLength <= 0) {
return chain.filter(exchange);
}
return DataBufferUtils.join(request.getBody()).map(dataBuffer -> {
exchange.getAttributes().put("cachedRequestBody", dataBuffer);
ServerHttpRequest decorator = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
return Mono.<DataBuffer>fromSupplier(() -> {
if (exchange.getAttributeOrDefault("cachedRequestBody", null) == null) {
// probably == downstream closed
return null;
}
// reset position
dataBuffer.readPosition(0);
// deal with Netty
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
return pdb.factory().wrap(pdb.getNativeBuffer().retainedSlice());
}).flux();
}
};
// TODO 消费 dataBuffer,例如计算 dataBuffer 的哈希值并验证
// ...
return decorator
})
.switchIfEmpty(Mono.just(request))
.flatMap(req -> chain.filter(exchange.mutate().request(req).build()))
.doFinally(s -> {
DataBuffer dataBuffer = exchange.getAttributeOrDefault("cachedRequestBody", null);
if (dataBuffer != null) {
DataBufferUtils.release(dataBuffer);
}
});
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
final ServerHttpRequest request = exchange.getRequest();
long contentLength = request.getHeaders().getContentLength();
if (contentLength <= 0) {
return chain.filter(exchange);
}
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
return Mono.create(sink -> {
DataBufferUtils.write(request.getBody(), outputStream).subscribe(DataBufferUtils::release, sink::error, sink::success);
})
.then(Mono.just(request))
.flatMap(req -> {
log.debug("缓存大小:{}", outputStream.size());
final ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(req) {
@Override
public Flux<DataBuffer> getBody() {
return DataBufferUtils.read(new ByteArrayResource(outputStream.toByteArray()), exchange.getResponse().bufferFactory(), 1024 * 8);
}
};
// TODO 对缓存的 ByteArrayOutputStream 进行处理,例如计算 ByteArrayOutputStream 中 byte[] 的哈希值并验证
// ...
return chain.filter(exchange.mutate().request(decorator).build());
});
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
final ServerHttpRequest request = exchange.getRequest();
long contentLength = request.getHeaders().getContentLength();
if (contentLength <= 0) {
return chain.filter(exchange);
}
try {
final Path tempFile = Files.createTempFile("HttpRequest", ".bin");
return DataBufferUtils.write(request.getBody(), tempFile)
.then(Mono.just(request))
.flatMap(req -> {
final ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(req) {
@Override
public Flux<DataBuffer> getBody() {
return DataBufferUtils.read(tempFile, exchange.getResponse().bufferFactory(), 1024 * 8, StandardOpenOption.READ);
}
};
// TODO 对缓存的 tempFile 进行处理,例如计算 tempFile 的哈希值并验证
// ...
return chain.filter(exchange.mutate().request(decorator).build());
})
.doFinally(s -> {
try {
Files.deleteIfExists(tempFile);
} catch (IOException e) {
throw new IllegalStateException(e);
}
});
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
在请求 body 比较大的情况的测试中,发现调用 DataBufferUtils#join() 方法(方法一)会占用较大的内存,并且请求完毕时可能不会立刻释放,在下一次 GC 时可释放。调用 DataBufferUtils#write() 方法直接写到 OutputStream (方法二)或者临时文件(方法三)时,则不会占用过多内存。
| Source |
|