用 vert.x web 理解 http chunk 传输


最近有个场景是后台查询大量数据返回前端,根据数据规律决定使用chunk方式流返回.

服务端使用vert.x web,客户端使用hutool工具提供的HttpUtil进行演示

基础代码

先上基础代码

服务端

public class Server {
  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    Router router = Router.router(vertx);
    vertx.createHttpServer().requestHandler(router).listen(10789);
    router.get("/test/chunk").handler(routingContext -> {
      System.out.println("收到请求");
      HttpServerResponse response = routingContext.response();
      response.setChunked(true);
      AtomicInteger i = new AtomicInteger();
      vertx.setPeriodic(1000, id -> {
        response.write(LocalDateTime.now().toString());
        response.write("\n");
        if (i.getAndIncrement() == 10) {
          vertx.cancelTimer(id);
          System.out.println("输出完毕");
          response.end();//服务端发送结束符
        }
      });
    });

  }
}

客户端

public class Client {
  public static void main(String[] args) {
    HttpResponse httpResponse = HttpUtil.createGet("http://127.0.0.1:10789/test/chunk").executeAsync();
    if (!httpResponse.isOk()) {
      System.out.println(httpResponse.body());
      return;
    }
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.bodyStream()));
    String str;
    try {
      while (StrUtil.isNotEmpty(str = bufferedReader.readLine())) {
        System.out.println(str);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }

  }
}

运行结果

client运行1

运行流程

服务端收到请求后,每秒输出当前时间;
输出10次后停止,response写出结束符.

客户端通过异步请求服务端接口,获取流InputStream,
按行读取缓冲,输出内容.

http协议

http协议连接是一种无状态连接,http的数据包分成请求头和请求体,响应包分别是响应头和响应体.
客户端在读取响应内容时,根据响应头时,根据content-length参数确认响应体的长度,读取到对应长度的内容,客户端就可以认为本次请求已经完成,已获取到所有内容.否则就一直等待服务端写出数据,直到缓冲超时.

chunk传输

http请求响应时,服务端在返回请求头信息时,无法得知需要返回的数据的真实长度.一般发生在动态信息更新(sse推送),异步接口请求等情形.这时就无法使用content-length来通知client端.可以使用transfer-encoding: chunked响应头来告诉客户端这是一个长连接,client就会一直等待接收服务端的响应数据.直到收到两个换行符\r\n组成的结束符,才会认为本次请求已经结束.

当客户端向服务器请求一个静态页面或者一张图片时,服务器可以很清楚的知道内容大小,然后通过Content-length消息首部字段告诉客户端需要接收多少数据。但是如果是动态页面等时,服务器是不可能预先知道内容大小,这时就可以使用Transfer-Encoding:chunk模式来传输数据了。即如果要一边产生数据,一边发给客户端,服务器就需要使用”Transfer-Encoding: chunked”这样的方式来代替Content-Length。
chunk编码将数据分成一块一块的发生。Chunked编码将使用若干个Chunk串连而成,由一个标明长度为0的chunk标示结束。每个Chunk分为头部和正文两部分,头部内容指定正文的字符总数(十六进制的数字)和数量单位(一般不写),正文部分就是指定长度的实际内容,两部分之间用回车换行(CRLF)隔开。在最后一个长度为0的Chunk中的内容是称为footer的内容,是一些附加的Header信息(通常可以直接忽略)。

问题:客户端断开

当服务端还在返回,客户端已经因为自身原因断开连接,服务端还会继续发送剩余数据,因为在一次http请求通讯中,客户端无法主动通知服务器断开连接
修改服务端代码,给服务端添加日志观察请求

vertx.setPeriodic(1000, id -> {
  String s = LocalDateTime.now().toString();
  System.out.println("输出中:" + s);
  response.write(s);
  response.write("\n");
  if (i.getAndIncrement() == 10) {
    vertx.cancelTimer(id);
    System.out.println("输出完毕");
    response.end();//服务端发送结束符
  }
});

可以观察到在客户端程序停止后,服务端仍然继续返回内容,这些内容会因为链接失效进入网卡缓冲后,再被删除,会照成性能的影响.
如果从http本身来说,是无法感知客户端当前的状态,vert.x提供一系列回调函数用来监听请求的通讯状态.

response.closeHandler(close -> System.out.println("response close"));
response.endHandler(end -> System.out.println("response end"));
response.exceptionHandler(event -> {
  System.out.println("response exception");
  event.printStackTrace();
});

response.closeHandler 当连接被关闭时的回调
response.endHandler 当连接响应结束时的回调
response.exceptionHandler 以外发生时的回调
再进行测试
先看正常情况


运行中途停止客户端

如果中途client断开连接,会触发回调函数,通知服务端连接发生变化.可以从这些监听中进行处理,停止后台业务处理,释放对应线程.不做无用功.

public class Server {
  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    Router router = Router.router(vertx);
    vertx.createHttpServer().requestHandler(router).listen(10789);
    router.get("/test/chunk").handler(routingContext -> {
      System.out.println("收到请求");
      HttpServerResponse response = routingContext.response();
      response.setChunked(true);
      AtomicInteger i = new AtomicInteger();
      AtomicReference<Long> timerId = new AtomicReference<>();
      vertx.setPeriodic(1000, id -> {
        timerId.set(id);
        String s = LocalDateTime.now().toString();
        System.out.println("输出中:" + s);
        response.write(s);
        response.write("\n");
        if (i.getAndIncrement() == 10) {
          vertx.cancelTimer(timerId.get());
          System.out.println("输出完毕");
          response.end();//服务端发送结束符
        }
      });
      response.closeHandler(close -> System.out.println("response close"));
      response.endHandler(end -> System.out.println("response end"));
      response.exceptionHandler(event -> {
        System.out.println("response exception");
        vertx.cancelTimer(timerId.get());
        event.printStackTrace();

      });
    });

  }
}

对基于http协议的理解加对response.exceptionHandler的源码进行简单分析,很很容易就能发现原理是对tcp连接状态进行监控,实际抛出的异常也是连接关闭的异常Connection was closed.


文章作者: 鱍鱍
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 鱍鱍 !
  目录