1 分钟阅读

前言

relayrides/pushy是一个 Java library for sending APNs (iOS, OS X, and Safari) push notifications. 揉和了netty和http2协议。因此,除了它本身的功能外,从中也可以学到许多对netty框架以及http2协议的使用技巧。

对外接口

一次消息的发送实例

// 构建payload
ApnsPayloadBuilder payloadBuilder = new ApnsPayloadBuilder();
payloadBuilder.setAlertBody("Example!");
String payload = payloadBuilder.buildWithDefaultMaximumLength();
String token = TokenUtil.sanitizeTokenString("<efc7492 bdbd8209>");
// 构建推送model
SimpleApnsPushNotification  pushNotification = new SimpleApnsPushNotification(token, "com.example.myApp", payload);
// 发送
Future<PushNotificationResponse<SimpleApnsPushNotification>> sendNotificationFuture = apnsClient.sendNotification(pushNotification);

几乎看不到netty 的痕迹,而传统的netty 客户端启动代码

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>(){
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new EchoClientHandler());
            }
        });
    ChannelFuture f = b.connect(host, port).sync();
    f.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully();
}

可以看到,两者差异还是蛮大的。

整体结构

send notification 流程

netty对外提供的操作接口有以下几个问题,当然,这也不怨netty

  1. 操作对象有多个,比如bootstrap、channel。并且,操作对象间存在依赖关系,channel不是直接初始化,而是通过bootstrap获得。
  2. netty启动逻辑复杂,必须由用户显式编写代码。
  3. 操作接口不简洁,对于io操作,简介的接口应该是

    1. 同步,response send(request)
    2. 异步,Future<response> send(request)

    而netty则无明确封装

通过pushy,我们可以学习到如何封装netty,同时学习如何以纯异步的方式新增自定义逻辑,比如线程池等。

使用ApnsChannelFactory封装Bootstrap

A Bootstrap that makes it easy to bootstrap a Channel to use for clients. 可见,Bootstrap 就是为了创建Channel,我们可以将创建channel的部分分两步

  1. 创建并配置Bootstrap
  2. 使用Bootstrap 创建channel

ApnsChannelFactory构造方法 构建Bootstrap

ApnsChannelFactory(final SslContext sslContext, ...
                    final EventLoopGroup eventLoopGroup) {
    this.sslContext = sslContext;
    this.addressResolverGroup = ...
    this.bootstrapTemplate = new Bootstrap();
    this.bootstrapTemplate.group(eventLoopGroup);
    this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true);
    this.bootstrapTemplate.remoteAddress(apnsServerAddress);
    this.bootstrapTemplate.resolver(this.addressResolverGroup);
    this.bootstrapTemplate.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
    this.bootstrapTemplate.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(final SocketChannel channel) {
            final ChannelPipeline pipeline = channel.pipeline();
            if (proxyHandlerFactory != null) {
                pipeline.addFirst(proxyHandlerFactory.createProxyHandler());
            }
            final SslHandler sslHandler = sslContext.newHandler(channel.alloc());
            sslHandler.handshakeFuture().addListener(...);
            pipeline.addLast(sslHandler);
            pipeline.addLast(ConnectionNegotiationErrorHandler.INSTANCE);
        }
    });
}

ApnsChannelFactory.create 调用Bootstrap 创建channel

public Future<Channel> create(final Promise<Channel> channelReadyPromise) {
    final long delay = this.currentDelaySeconds.get();
    this.bootstrapTemplate.config().group().schedule(new Runnable() {
        public void run() {
            final Bootstrap bootstrap = ApnsChannelFactory.this.bootstrapTemplate.clone()
                    .channelFactory(...);
            final ChannelFuture connectFuture = bootstrap.connect();
            connectFuture.addListener(...);
            connectFuture.channel().closeFuture().addListener(...);
        }
    }, delay, TimeUnit.SECONDS);

    return channelReadyPromise;
}

ApnsChannelPool.acquire

ApnsChannelPool 比较关键的几个字段,决定了复用现有的channel 还是执行channelFactory.create

  1. capacity,ApnsChannelPool 的最大连接数
  2. allChannels,ChannelGroup 真正持有 channel的容器
  3. pendingCreateChannelFutures,持有channelFactory.create返回的Future<Channel> 即当前正在创建的 channel

channel容器——ChannelGroup

A thread-safe Set that contains open Channels and provides various bulk operations on them. 管理所有channel,提供批量操作,比如ChannelGroupFuture write(Object message) 的实现便是 遍历所有channel并执行channel.write

A closed Channel is automatically removed from the collection, so that you don’t need to worry about the life cycle of the added Channel.

private final ChannelFutureListener remover = new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        remove(future.channel());
    }
};

ChannelGroup 监听了所管理的每一个channel的close 事件

public boolean add(Channel channel) {
    ConcurrentMap<ChannelId, Channel> map = xx;
    boolean added = map.putIfAbsent(channel.id(), channel) == null;
    if (added) {
        channel.closeFuture().addListener(remover);
    }
    ...
    return added;
}

最佳实践

2018.6.18 补充 relayrides/pushy

  1. Flow control。pushy 是纯异步接口,pushy 有个控制,实现原理估计是1500个future 未返回时,则将后续的请求 缓冲起来,等等这个1500个inflight 的reqeust。

    当数据量很大时,必然触发1500 的上线,进而大量请求 缓存在netty buffer 中,然后耗尽内存。此时,需要一个 flow control layer ,可以使用CountDown 或 Semaphore

其它

从pushy 中我们可以学习到,如何去写一个纯异步框架

标签:

分类:

更新时间:

留下评论