Netty实现文件下载
实例:如何使用 Netty 下载文件_channelhandlercontext下载文件-CSDN博客
ChannelHandler
自定义 ChannelHandler ,用来处理 Channel 里面的事件,写数据处理逻辑的。
ChannelInboundHandlerAdapter
SimpleChannelInboundHandler
是 ChannelInboundHandlerAdapter 的子类,能够指定类型。
Netty 里面预设了很多 ChannelHandler
java
ch.pipeline().addLast("decoder",new HttpRequestDecoder());
ch.pipeline().addLast("encoder",new HttpResponseEncoder());
ch.pipeline().addLast("inflater",new HttpContentCompressor());
ch.pipeline().addLast("deflater",new HttpContentDecompressor());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(10*1024*1024));
//文件下载需要用到该 ChannelHandler,用来处理chunked类型请求
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
下载文件用到了 ChunkedWriteHandler
java
@ChannelHandler.Sharable
public class BootNettyChannelInboundHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest> {
private FullHttpRequest request;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
//处理进来的网络数据
this.request = fullHttpRequest;
System.out.println("请求进来了");
String uri = request.uri();
if (uri.contains("down")) {
downloadFile(channelHandlerContext);
}
}
/*连接建立以后*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(
"Hello Netty", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@SneakyThrows
public void downloadFile(ChannelHandlerContext ctx) {
String path = "D:\\IdeaWorkSpace\\JavaAdvanced\\netty-practice\\file\\test.txt";
File file = new File(path);
RandomAccessFile raf;
raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
//设置请求头部
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=UTF-8");
response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION,
"attachment; filename=\"" + URLEncoder.encode(file.getName(), "UTF-8") + "\";");
ctx.write(response);
// Write the content.
ChannelFuture sendFileFuture;
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
System.err.println(future.channel() + " Transfer progress: " + progress);
} else {
System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) {
System.err.println(future.channel() + " Transfer complete.");
}
});
//刷新缓冲区数据,文件结束标志符
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
java
public class HttpDataHandlerAdapter extends ChannelInboundHandlerAdapter{
private static final Logger logger = LogManager.getLogger(HttpDataHandlerAdapter.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//判断类型,然后强转
if(msg instanceof FullHttpRequest){
FullHttpRequest req = (FullHttpRequest)msg;
try {
String uri = req.getUri();
ByteBuf buf = req.content();
byte[] reqBody = new byte[buf.readableBytes()];
buf.readBytes(reqBody);
FullHttpResponse resp;
HttpHeaders headers = req.headers();
String contentType = headers.get("Content-Type");
if (StringUtils.isNotBlank(uri) && uri.contains(Constants.downLoadFile)) {
logger.info("获取请求url:{},headers:{}", uri, headers);
downloadFile(ctx, uri);
return;
}
if (StringUtils.isNotBlank(uri) && uri.contains(Constants.detectDownLoadFile)) {
logger.info("获取请求url:{},headers:{}", uri, headers);
detectTask(ctx, uri);
return;
}
HttpDataHandler handler = HttpDataHandlerFactory.getHandlerByContentType(contentType);
if (handler==null){
logger.warn("Receiver a HttpMessage from endpoint:{},contentType{},but No Handler can process this",uri,contentType);
resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
}else{
Boolean ok = handler.processHttpDataAsync(uri,reqBody,headers);
if (!ok) {
resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.TOO_MANY_REQUESTS);
} else {
resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
}
}
// close channel
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
logger.info("发生异常:" + e.getMessage());
} finally {
req.release();
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// 当出现异常就关闭连接
logger.warn("Handler Http Message Failed! exception:\n{}",cause);
ctx.close();
}
private void downloadFile(ChannelHandlerContext ctx, String uri) throws IOException {
HttpResponse resp;
String filePath = uri.split("path=")[1];
Integer index = filePath.lastIndexOf("/");
String fileName = filePath.substring(index + 1);
String path = Configuration
.FILE_UPLOAD_PATH + filePath;
File file = new File(CleanPathUtil.cleanString(path));
if (!file.exists()) {
logger.error("未找到指定文件:{}", path);
resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
}
RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
resp.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment;filename=\"" + URLEncoder.encode(fileName, "UTF-8") + "\";");
ctx.write(resp);
ChannelFuture sendFileFuture = ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture channelProgressiveFuture, long progress, long total) throws Exception {
if (total < 0) {
logger.info("file {} transfer progress: ", progress);
} else {
logger.info("file transfer progress: {}/{}", progress, total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture channelProgressiveFuture) throws Exception {
System.out.println("文件传输完成");
raf.close();
}
});
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
private void detectTask(ChannelHandlerContext ctx, String uri) throws IOException {
String fileName = uri.split("key=")[1];
String dateStr;
try {
String yyMMdd = fileName.split("\\.")[4];
dateStr = "20" + yyMMdd.substring(0, 6) + "/";
} catch (Exception exception) {
logger.error("detect 文件命名不正确;准备返回空,fileName={}", fileName);
HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
return;
}
String path = Configuration.FILE_UPLOAD_PATH + dateStr + fileName + ".log";
logger.info("start HttpDataHandlerAdapter.detectTask download or see => file path :{}", path);
File file = new File(CleanPathUtil.cleanString(path));
if (!file.exists()) {
logger.error("探测任务未找到指定文件:{}", path);
HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
return;
}
if (uri.contains("down") && uri.contains("key")) {
HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
resp.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment;filename=\"" + URLEncoder.encode(path, "UTF-8") + "\";");
ctx.write(resp);
ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise());
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else if (uri.contains("see") && uri.contains("key")) {
FileReader fileReader = new FileReader(CleanPathUtil.cleanString(path));
StringBuilder stringBuffer = new StringBuilder();
BufferedReader bufferedReader = new BufferedReader(fileReader);
int line = 0;
stringBuffer.append("<div>");
while (line <= 50) {
String s = bufferedReader.readLine();
if (null == s) {
break;
}
s = s.replace(" ", " ");
s = s.replace("\t", " ");
stringBuffer.append("<p>").append(s).append("</p>");
line++;
}
stringBuffer.append("</div>");
HttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(stringBuffer.toString().getBytes()));
resp.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain;charset=UTF-8");
ctx.writeAndFlush(resp);
ctx.close();
} else {
logger.error(" end HttpDataHandlerAdapter.detectTask 不识别的探测流表获取方式");
}
}
}
Pipeline
Pipeline 指 Netty 中处理网络事件的流水线机制,它基于责任链设计模式设计,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。
当 Channel 事件触发后,该事件会沿着 Pipeline 传递,被相应的 ChannelHandler 处理。
java
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
channel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
channel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
channel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
/**
* 自定义ChannelInboundHandlerAdapter
*/
channel.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
}
}
启动类
java
public class NettyPracticeApplication{
@SneakyThrows
public static void main(String[] args ) {
new BootNettyServer().bind(8088);
}
}
服务端
java
public class BootNettyServer {
public void bind(int port) throws Exception {
/**
* 配置服务端的NIO线程组
* NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
* bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
* bossGroup接收到连接后就会把连接信息注册到workerGroup
* workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/**
* ServerBootstrap 是一个启动NIO服务的辅助启动类
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
*/
serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
/**
* ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
*/
serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
/**
* option是设置 bossGroup,childOption是设置workerGroup
* netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费最小初始化最大 (根据生产环境实际情况来定)
* 使用对象池,重用缓冲区
*/
serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
/**
* 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
*/
serverBootstrap = serverBootstrap.childHandler(new BootNettyChannelInitializer<SocketChannel>());
log.info("netty server start success!");
/**
* 绑定端口,同步等待成功
*/
ChannelFuture f = serverBootstrap.bind(port).sync();
/**
* 等待服务器监听端口关闭
*/
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
} finally {
/**
* 退出,释放线程池资源
*/
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}