前言
- 前言
- 对外接口
- 整体结构
- 使用ApnsChannelFactory封装Bootstrap
- ApnsChannelPool.acquire
- channel容器——ChannelGroup
- 最佳实践
- 其它
relayrides/pushy是一个 Java library for sending APNs (iOS, OS X, and Safari) push notifications. 揉和了netty和http2协议。因此,除了它本身的功能外,从中也可以学到许多对netty框架以及http2协议的使用技巧。
对外接口
一次消息的发送实例
// 构建payload
ApnsPayloadBuilder payloadBuilder = new ApnsPayloadBuilder();
payloadBuilder.setAlertBody("Example!");
String payload = payloadBuilder.buildWithDefaultMaximumLength();
String token = TokenUtil.sanitizeTokenString("<efc7492 bdbd8209>");
// 构建推送model
SimpleApnsPushNotification pushNotification = new SimpleApnsPushNotification(token, "com.example.myApp", payload);
// 发送
Future<PushNotificationResponse<SimpleApnsPushNotification>> sendNotificationFuture = apnsClient.sendNotification(pushNotification);
几乎看不到netty 的痕迹,而传统的netty 客户端启动代码
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
可以看到,两者差异还是蛮大的。
整体结构
send notification 流程
netty对外提供的操作接口有以下几个问题,当然,这也不怨netty
- 操作对象有多个,比如bootstrap、channel。并且,操作对象间存在依赖关系,channel不是直接初始化,而是通过bootstrap获得。
- netty启动逻辑复杂,必须由用户显式编写代码。
-
操作接口不简洁,对于io操作,简介的接口应该是
- 同步,
response send(request)
- 异步,
Future<response> send(request)
而netty则无明确封装
- 同步,
通过pushy,我们可以学习到如何封装netty,同时学习如何以纯异步的方式新增自定义逻辑,比如线程池等。
使用ApnsChannelFactory封装Bootstrap
A Bootstrap that makes it easy to bootstrap a Channel to use for clients. 可见,Bootstrap 就是为了创建Channel,我们可以将创建channel的部分分两步
- 创建并配置Bootstrap
- 使用Bootstrap 创建channel
ApnsChannelFactory构造方法 构建Bootstrap
ApnsChannelFactory(final SslContext sslContext, ...
final EventLoopGroup eventLoopGroup) {
this.sslContext = sslContext;
this.addressResolverGroup = ...
this.bootstrapTemplate = new Bootstrap();
this.bootstrapTemplate.group(eventLoopGroup);
this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true);
this.bootstrapTemplate.remoteAddress(apnsServerAddress);
this.bootstrapTemplate.resolver(this.addressResolverGroup);
this.bootstrapTemplate.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
this.bootstrapTemplate.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();
if (proxyHandlerFactory != null) {
pipeline.addFirst(proxyHandlerFactory.createProxyHandler());
}
final SslHandler sslHandler = sslContext.newHandler(channel.alloc());
sslHandler.handshakeFuture().addListener(...);
pipeline.addLast(sslHandler);
pipeline.addLast(ConnectionNegotiationErrorHandler.INSTANCE);
}
});
}
ApnsChannelFactory.create 调用Bootstrap 创建channel
public Future<Channel> create(final Promise<Channel> channelReadyPromise) {
final long delay = this.currentDelaySeconds.get();
this.bootstrapTemplate.config().group().schedule(new Runnable() {
public void run() {
final Bootstrap bootstrap = ApnsChannelFactory.this.bootstrapTemplate.clone()
.channelFactory(...);
final ChannelFuture connectFuture = bootstrap.connect();
connectFuture.addListener(...);
connectFuture.channel().closeFuture().addListener(...);
}
}, delay, TimeUnit.SECONDS);
return channelReadyPromise;
}
ApnsChannelPool.acquire
ApnsChannelPool 比较关键的几个字段,决定了复用现有的channel 还是执行channelFactory.create
- capacity,ApnsChannelPool 的最大连接数
- allChannels,ChannelGroup 真正持有 channel的容器
- pendingCreateChannelFutures,持有
channelFactory.create
返回的Future<Channel>
即当前正在创建的 channel
channel容器——ChannelGroup
A thread-safe Set that contains open Channels and provides various bulk operations on them. 管理所有channel,提供批量操作,比如ChannelGroupFuture write(Object message)
的实现便是 遍历所有channel并执行channel.write
A closed Channel is automatically removed from the collection, so that you don’t need to worry about the life cycle of the added Channel.
private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
remove(future.channel());
}
};
ChannelGroup 监听了所管理的每一个channel的close 事件
public boolean add(Channel channel) {
ConcurrentMap<ChannelId, Channel> map = xx;
boolean added = map.putIfAbsent(channel.id(), channel) == null;
if (added) {
channel.closeFuture().addListener(remover);
}
...
return added;
}
最佳实践
2018.6.18 补充 relayrides/pushy
-
Flow control。pushy 是纯异步接口,pushy 有个控制,实现原理估计是1500个future 未返回时,则将后续的请求 缓冲起来,等等这个1500个inflight 的reqeust。
当数据量很大时,必然触发1500 的上线,进而大量请求 缓存在netty buffer 中,然后耗尽内存。此时,需要一个 flow control layer ,可以使用CountDown 或 Semaphore
其它
从pushy 中我们可以学习到,如何去写一个纯异步框架