Java NIO之选择器

1.简介

前面的文章说了缓冲区,说了通道,本文就来说说 NIO 中另一个重要的实现,即选择器 Selector。在更早的文章中,我简述了几种 IO 模型。如果大家看过之前的文章,并动手写过代码的话。再看 Java 的选择器大概就会知道它是什么了,以及怎么用了。选择器是 Java 多路复用模型的一个实现,可以同时监控多个非阻塞套接字通道。示意图大致如下:

如果大家了解过多路复用模型,那应该也会知道几种复用模型的实现。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的选择器并非凭空创造,而是在底层操作系统提供的接口的基础上封装而来。相关的细节,我随后会进行分析。

关于 Java 选择器的简介这里先说到这,接下来进入正题。

2.基本操作及实现

本章我将对 Selector 的创建,通道的注册,Selector 的选择过程进行分析。内容篇幅较大,希望大家耐心看完。由于 Selector 相关类在不同操作系统下的实现是不同的,加之个人对 Linux epoll 更为熟悉,所以本文所分析的源码也是和 epoll 相关的。好了,进入正题吧。

2.1 创建选择器

选择器 Selector 是一个抽象类,所以不能直接创建。Selector 提供了一个 open 方法,通过 open 方法既可以创建选择器实例。示例代码如下:

1
Selector selector = Selector.open();

上面的代码比较简单,只有一行。不过不要被表象迷惑,这行代码仅是完整实现的冰山一角,更复杂的逻辑则隐藏在水面之下。
在简介一节,我已经说了 Java 选择器是对底层多路复用接口的一个包装,这里的 open 方法也不例外。假设我们的 Java 运行在 Linux 平台下,那么 open 最终所做的事情应该是调用操作系统的epoll_create函数,用于创建 epoll 实例。真实情况是不是如此呢?答案就在冰山深处,接下来就让我们一起去求索吧。下面我们将沿着 open 方法一路走下去,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public abstract class Selector implements Closeable {
public static Selector open() throws IOException {
// 创建 SelectorProvider,再通过其 openSelector 方法创建 Selector
return SelectorProvider.provider().openSelector();
}
// 省略无关代码
}

public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 创建默认的 SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}

public class DefaultSelectorProvider {
private DefaultSelectorProvider() { }

/**
* 根据系统名称创建相应的 SelectorProvider
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");

//
return new sun.nio.ch.PollSelectorProvider();
}

/**
* 加载 SelectorProvider 类,并创建实例
*/
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}

}
}

/**
* 创建完 SelectorProvider,接下来要调用 openSelector 方法
* 创建 Selector 的继承类了。
*/
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}

class EPollSelectorImpl extends SelectorImpl {
EPollSelectorImpl(SelectorProvider sp) throws IOException {
// 调用父类构造方法
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;

// 创建 EPollArrayWrapper,EPollArrayWrapper 是一个重要的实现
pollWrapper = new EPollArrayWrapper();

pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
}

public abstract class SelectorImpl extends AbstractSelector {
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();

/* 初始化 publicKeys 和 publicSelectedKeys,
* publicKeys 即 selector.keys() 方法所返回的集合,
* publicSelectedKeys 则是 selector.selectedKeys() 方法返回的集合
*/
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}

/**
* EPollArrayWrapper 一个重要的实现,这一层再往下就是 C 代码了
*/
class EPollArrayWrapper {
EPollArrayWrapper() throws IOException {
// 调用 epollCreate 方法创建 epoll 文件描述符
epfd = epollCreate();

// the epoll_event array passed to epoll_wait
// 初始化 pollArray,该对象用于存储就绪文件描述符和事件
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();

// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}

// epollCreate 方法是 native 类型的
private native int epollCreate();
}

以上代码时 Java 层面的,Java 层调用栈最下面的类是 EPollArrayWrapper(源码路径可以在附录中查找)。EPollArrayWrapper 是一个重要的实现,起着承上启下的作用。上层是 Java 代码,下层是 C 代码。上层的代码看完了,接下来看看冰山深处的 C 代码:

1
2
3
4
5
6
7
8
9
10
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
// 调用 epoll_create 函数创建 epoll 实例,并返回文件描述符 epfd
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}

上面的代码很简单,仅做了创建 epoll 实例这一件事。看到这里,答案就明了了。最后在附一张时序图帮助大家理清代码调用顺序,如下:

Selector.open

2.2 选择键

2.2.1 几种事件

选择键 SelectionKey 包含4种事件,分别是:

1
2
3
4
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

事件之间可以通过或运算进行组合,比如:

1
int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

2.2.2 两种事件集合:interestOps 和 readyOps

interestOps 即感兴趣的事件集合,通道调用 register 方法注册时会设置此值,interestOps 可通过 SelectionKey interestOps() 方法获取。readyOps 是就绪事件集合,可通过 SelectionKey readyOps() 获取。

interestOps 和 readyOps 被声明在 SelectionKey 子类 SelectionKeyImpl 中,代码如下:

1
2
3
4
public class SelectionKeyImpl extends AbstractSelectionKey {
private volatile int interestOps;
private int readyOps;
}

接下来再来看看与 readyOps 事件集合相关的几个方法,如下:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

以上方法从字面意思上就可以知道有什么用,这里就不解释了。接下来以 isReadable 方法为例,简单看一下这个方法是如何实现。

1
2
3
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}

上面说到可以通过或运算组合事件,这里则是通过与运算来测试某个事件是否在事件集合中。比如

1
2
3
readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101
readyOps & OP_READ = 0101 & 0001 = 0001
readyOps & OP_CONNECT = 0101 & 1000 = 0

readyOps & OP_READ != 0,所以 OP_READ 在事件集合中。readyOps & OP_CONNECT == 0,所以 OP_CONNECT 不在事件集合中。

2.2.3 attach 方法

attach 是一个好用的方法,通过这个方法,可以将对象暂存在 SelectionKey 中,待需要的时候直接取出来即可。比如本文对应的练习代码实现了一个简单的 HTTP 服务器,在读取用户请求数据后(即 selectionKey.isReadable() 为 true),会去解析请求头,然后将请求头信息通过 attach 方法放入 selectionKey 中。待通道可写后,再从 selectionKey 中取出请求头,并根据请求头回复客户端不同的消息。当然,这只是一个应用场景,attach 可能还有其他的应用场景,比如标识通道。不过其他的场景我没使用过,就不说了。attach 使用方式如下:

1
2
selectionKey.attach(obj);
Object attachedObj = selectionKey.attachment();

2.3 通道注册

通道注册即将感兴趣的事件告知 Selector,待事件发生时,Selector 即可返回就绪事件,我们就可以去做后续的事情了。比如 ServerSocketChannel 通道通常对 OP_ACCEPT 事件感兴趣,那么我们就可以把这个事件注册给 Selector。待事件发生,即服务端接受客户端连接后,我们即可获取这个就绪的事件并做相应的操作。通道注册的示例代码如下:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

起初我以为通道注册操作会调用操作系统的 epoll_ctl 函数,但最终通过看源码,发现自己的理解是错的。既然通道注册阶段不调用 epoll_ctl 函数。那么,epoll_ctl 什么时候才会被调用呢?如果不调用 epoll_ctl,那么注册过程都干了什么事情呢?关于第一个问题,本节还无法解答,不过第二个问题则可以说说。接下来让我们深入通道类 register 方法的调用栈中去探寻答案吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
return register(sel, ops, null);
}

public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
}

public abstract class AbstractSelectableChannel extends SelectableChannel {

private SelectionKey[] keys = null;

public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
// 省去一些校验代码

// 从 keys 数组中查找,查找条件为 k.selector() == sel
SelectionKey k = findKey(sel);

// 如果 k 不为空,则修改 k 所感兴趣的事件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}

// k 为空,则创建一个 SelectionKey,并存储到 keys 数组中
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}

public abstract class AbstractSelector extends Selector {
protected abstract SelectionKey register(AbstractSelectableChannel ch,
int ops, Object att);
}

public abstract class SelectorImpl extends AbstractSelector {
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
// 创建 SelectionKeyImpl 实例
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
}

class EPollSelectorImpl extends SelectorImpl {
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
// 存储 fd 和 SelectionKeyImpl 的映射关系
fdToKey.put(fd, ski);

pollWrapper.add(fd);
// 将 SelectionKeyImpl 实例存储到 keys 中(这里的 keys 声明在 SelectorImpl 类中),keys 集合可由 selector.keys() 方法获取
keys.add(ski);
}
}

public class SelectionKeyImpl extends AbstractSelectionKey {
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}

public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
// 转换并设置感兴趣的事件
channel.translateAndSetInterestOps(ops, this);
// 设置 interestOps 变量
interestOps = ops;
return this;
}
}

class SocketChannelImpl extends SocketChannel implements SelChImpl {
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
// 转换事件
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
// 设置事件
sk.selector.putEventOps(sk, newOps);
}
}

class class EPollSelectorImpl extends SelectorImpl {
public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
// 设置感兴趣的事件
pollWrapper.setInterest(ch.getFDVal(), ops);
}
}

class EPollArrayWrapper {
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// 扩容 updateDescriptors 数组,并存储文件描述符 fd
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
int[] newDescriptors = new int[newCapacity];
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;

// events are stored as bytes for efficiency reasons
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
// 存储事件
setUpdateEvents(fd, b, false);
}
}

private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
}

到 setUpdateEvents 这个方法,整个调用栈就结束了。但是我们并未在调用栈中看到调用 epoll_ctl 函数的地方,也就是说,通道注册时,并不会立即调用 epoll_ctl,而是先将事件集合 events 存放在 eventsLow。至于 epoll_ctl 函数何时调用的,需要大家继续往下看了。

2.4 选择过程

2.4.1 选择方法

Selector 包含3种不同功能的选择方法,分别如下:

  • int select()
  • int select(long timeout)
  • int selectNow()

select() 是一个阻塞方法,仅在至少一个通道处于就绪状态时才返回。
select(long timeout) 同样也是阻塞方法,不过可对该方法设置超时时间(timeout > 0),使得线程不会被一直阻塞。如果 timeout = 0,会一直阻塞线程。
selectNow() 为非阻塞方法,调用后立即返回。

以上3个方法均返回 int 类型值,表示每次调用 select 或 selectNow 方法后,新就绪通道的数量。如果某个通道在上一次调用 select 方法时就已经处于就绪状态,但并未将该通道对应的 SelectionKey 对象从 selectedKeys 集合中移除。假设另一个的通道在本次调用 select 期间处于就绪状态,此时,select 返回1,而不是2。

2.4.2 选择过程

选择方法用起来虽然简单,但方法之下隐藏的逻辑还是比较复杂的。大致分为下面几个步骤:

  1. 检查已取消键集合 cancelledKeys 是否为空,不为空则将 cancelledKeys 的键从 keys 和 selectedKeys 中移除,并将键和通道注销。
  2. 调用操作系统的 epoll_ctl 函数将通道感兴趣的事件注册到 epoll 实例中
  3. 调用操作系统的 epoll_wait 函数监听事件
  4. 再次执行步骤1
  5. 更新 selectedKeys 集合,并返回就绪通道数量

上面五个步骤对应于 EPollSelectorImpl 类中 doSelect 方法的逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
// 处理已取消键集合,对应步骤1
processDeregisterQueue();
try {
begin();
// select 方法的核心,对应步骤2和3
pollWrapper.poll(timeout);
} finally {
end();
}
// 处理已取消键集合,对应步骤4
processDeregisterQueue();

// 更新 selectedKeys 集合,并返回就绪通道数量,对应步骤5
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}

接下来,我们按照上面的步骤顺序去分析代码实现。先来看看步骤1对应的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
+----SelectorImpl.java
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
// 遍历 cancelledKeys,执行注销操作
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
// 执行注销逻辑
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}

+----EPollSelectorImpl.java
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
// 移除 fd 和选择键键的映射关系
fdToKey.remove(Integer.valueOf(fd));
// 从 epoll 实例中删除事件
pollWrapper.remove(fd);
ski.setIndex(-1);

// 从 keys 和 selectedKeys 中移除选择键
keys.remove(ski);
selectedKeys.remove(ski);

// 注销选择键
deregister((AbstractSelectionKey)ski);

// 注销通道
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}

上面的代码代码逻辑不是很复杂,首先是获取 cancelledKeys 集合,然后遍历集合,并对每个选择键及其对应的通道执行注销操作。接下来再来看看步骤2和3对应的代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
+----EPollArrayWrapper.java
int poll(long timeout) throws IOException {
// 调用 epoll_ctl 函数注册事件,对应步骤3
updateRegistrations();

// 调用 epoll_wait 函数等待事件发生,对应步骤4
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}

/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
// 获取 fd 和 events,这两个值在调用 register 方法时被存储到数组中
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;

if (events != KILLED) {
// 确定 opcode 的值
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 注册事件
epollCtl(epfd, opcode, fd, events);
// 设置 fd 的注册状态
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}

// 下面两个均是 native 方法
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
}

看到 updateRegistrations 方法的实现,大家现在知道 epoll_ctl 这个函数是在哪里调用的了。在 3.2 节通道注册的结尾给大家埋了一个疑问,这里就是答案了。注册通道实际上只是先将事件收集起来,等调用 select 方法时,在一起通过 epoll_ctl 函数将事件注册到 epoll 实例中。

上面 epollCtl 和 epollWait 方法是 native 类型的,接下来我们再来看看这两个方法是如何实现的。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
+----EPollArrayWrapper.c
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {
struct epoll_event event;
int res;

event.events = events;
event.data.fd = fd;

// 调用 epoll_ctl 注册事件
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}

JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
struct epoll_event *events = jlong_to_ptr(address);
int res;

if (timeout <= 0) { /* Indefinite or no wait */
// 调用 epoll_wait 等待事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}

if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}

上面的C代码没什么复杂的逻辑,这里就不多说了。如果大家对 epoll_ctl 和 epoll_wait 函数不了解,可以参考 Linux man-page。关于 epoll 的示例,也可以参考我的另一篇文章“基于epoll实现简单的web服务器”

说完步骤2和3对应的代码,接下来再来说说步骤4和5。由于步骤4和步骤1是一样的,这里不再赘述。最后再来说说步骤5的逻辑。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
+----EPollSelectorImpl.java
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
/* 从 pollWrapper 成员变量的 pollArray 中获取文件描述符,
* pollArray 中的数据由 epoll_wait 设置
*/
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
// 从 pollArray 中获取就绪事件集合
int rOps = pollWrapper.getEventOps(i);

/* 如果 selectedKeys 已包含选择键,则选择键必须由新的事件发生时,
* 才会将 numKeysUpdated + 1
*/
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
// 转换并设置就绪事件集合
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
// 更新 selectedKeys 集合,并将 numKeysUpdated + 1
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}

// 返回 numKeysUpdated
return numKeysUpdated;
}

+----SocketChannelImpl.java
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
int oldOps = sk.nioReadyOps();
int newOps = initialOps;

if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
return false;
}

if ((ops & (PollArrayWrapper.POLLERR
| PollArrayWrapper.POLLHUP)) != 0) {
newOps = intOps;
sk.nioReadyOps(newOps);
// No need to poll again in checkConnect,
// the error will be detected there
readyToConnect = true;
return (newOps & ~oldOps) != 0;
}

/*
* 转换事件
*/
if (((ops & PollArrayWrapper.POLLIN) != 0) &&
((intOps & SelectionKey.OP_READ) != 0) &&
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_READ;

if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
((intOps & SelectionKey.OP_CONNECT) != 0) &&
((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
newOps |= SelectionKey.OP_CONNECT;
readyToConnect = true;
}

if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
((intOps & SelectionKey.OP_WRITE) != 0) &&
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_WRITE;

// 设置事件
sk.nioReadyOps(newOps);

// 如果新的就绪事件和老的就绪事件不相同,则返回true,否则返回 false
return (newOps & ~oldOps) != 0;
}

上面就是步骤5的逻辑了,简单总结一下。首先是获取就绪通道数量,然后再获取这些就绪通道对应的文件描述符 fd,以及就绪事件集合 rOps。之后调用 translateAndSetReadyOps 转换并设置就绪事件集合。最后,将选择键添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之后返回该值。

以上就是选择过程的代码讲解,贴了不少代码,可能不太好理解。Java NIO 和操作系统接口关联比较大,所以在学习 NIO 相关原理时,也应该去了解诸如 epoll 等系统调用的知识。没有这些背景知识,很多东西看起来不太好懂。好了,本节到此结束。

2.5 模板代码

使用 NIO 选择器编程时,主干代码的结构一般比较固定。所以把主干代码写好后,就可以往里填业务代码了。下面贴一个服务端的模板代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8080));
ssc.configureBlocking(false);

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
int readyNum = selector.select();
if (readyNum == 0) {
continue;
}

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();

while(it.hasNext()) {
SelectionKey key = it.next();

if(key.isAcceptable()) {
// 接受连接
} else if (key.isReadable()) {
// 通道可读
} else if (key.isWritable()) {
// 通道可写
}

it.remove();
}
}

2.6 实例演示

原本打算将示例演示的代码放在本节中展示,奈何文章篇幅已经很大了,所以决定把本节的内容独立成文。在下一篇文章中,我将会演示使用 Java NIO 完成一个简单的 HTTP 服务器。这里先贴张效果图,如下:

tinyhttpd_w

3.总结

到这里,本文差不多就要结束了。原本只是打算简单说说 Selector 的用法,然后再写一份实例代码。但是后来发现这样写显得比较空洞,没什么深度。所以后来翻了一下 Selector 的源码,大致理解了 Selector 的逻辑,然后就有了上面的分析。不过 Selector 的逻辑并不止我上面所说的那些,还有一些内容我现在还没看,所以就没有讲。对于已写出来的分析,由于我个人水平有限,难免会有错误。如果有错误,也欢迎大家指出来,共同进步!

好了,本文到此结束,感谢大家的阅读。

参考

附录

文中贴的一些代码是没有包含在 JDK src.zip 包里的,这里单独列举出来,方便大家查找。

文件名路径
DefaultSelectorProvider.javajdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
EPollSelectorProvider.javajdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java
SelectorImpl.javajdk/src/share/classes/sun/nio/ch/SelectorImpl.java
EPollSelectorImpl.javajdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java
EPollArrayWrapper.javajdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java
SelectionKeyImpl.javajdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java
SocketChannelImpl.javajdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java
EPollArrayWrapper.cjdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c