forked from aofeng/JavaTutorial
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNioEchoServer.java
More file actions
296 lines (257 loc) · 10.6 KB
/
NioEchoServer.java
File metadata and controls
296 lines (257 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
package cn.aofeng.demo.nio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 用NIO实现的Echo Server。
* @author 聂勇 <a href="mailto:aofengblog@163.com">aofengblog@163.com</a>
*/
public class NioEchoServer {
private final static Logger logger = Logger.getLogger(NioEchoServer.class.getName());
// 换行符
public final static char CR = '\r';
// 回车符
public final static char LF = '\n';
/**
* @return 当前系统的行结束符
*/
private static String getLineEnd() {
return System.getProperty("line.separator");
}
/**
* 重置缓冲区状态标志位:position设置为0,limit设置为capacity的值,所有mark无效。
* 注:缓冲区原来的内容还在,并没有清除。
*
* @param buffer 字节缓冲区
*/
private static void clear(ByteBuffer buffer) {
if (null != buffer) {
buffer.clear();
}
}
/**
* 将字节缓冲区的每一个字节转换成ASCII字符。
* @param buffer 字节缓冲区
* @return 转换后的字节数组字符串
*/
private static String toDisplayChar(ByteBuffer buffer) {
if (null == buffer) {
return "null";
}
return Arrays.toString(buffer.array());
}
/**
* 将字节缓冲区用utf8编码,转换成字符串。
*
* @param buffer 字节缓冲区
* @return utf8编码转换的字符串
* @throws UnsupportedEncodingException
*/
private static String convert2String(ByteBuffer buffer) throws UnsupportedEncodingException {
return new String(buffer.array(), "utf8");
}
/**
* 去掉尾末的行结束符(\r\n),并转换成字符串。
*
* @param buffer 字节缓冲区
* @return 返回去掉行结束符后的字符串。
* @throws UnsupportedEncodingException
* @see #convert2String(ByteBuffer)
*/
private static String getLineContent(ByteBuffer buffer) throws UnsupportedEncodingException {
if (null == buffer) {
return null;
}
byte[] result = new byte[buffer.limit()-2];
System.arraycopy(buffer.array(), 0, result, 0, result.length);
return convert2String(ByteBuffer.wrap(result));
}
/**
* 顺序合并两个{@link ByteBuffer}的内容,且不改变{@link ByteBuffer}原来的标志位。即:
* <pre>
* 合并后的ByteBuffer = first + second
* </pre>
* @param first 第一个待合并的{@link ByteBuffer},合并后其内容在前面
* @param second 第二个待合并的{@link ByteBuffer},合并后其内容在后面
* @return 合并后的内容。如果两个{@link ByteBuffer}都为null,返回null。
*/
private static ByteBuffer merge(ByteBuffer first, ByteBuffer second) {
if (null == first && null == second) {
return null;
}
int oneSize = null != first ? first.limit() : 0;
int twoSize = null != second ? second.limit() : 0;
ByteBuffer result = ByteBuffer.allocate(oneSize+twoSize);
if (null != first) {
result.put(Arrays.copyOfRange(first.array(), 0, oneSize));
}
if (null != second) {
result.put(Arrays.copyOfRange(second.array(), 0, twoSize));
}
result.rewind();
return result;
}
/**
* 从字节缓冲区中获取"一行",即获取包括行结束符及其前面的内容。
*
* @param buffer 输入缓冲区
* @return 有遇到行结束符,返回包括行结束符在内的字节缓冲区;否则返回null。
*/
private static ByteBuffer getLine(ByteBuffer buffer) {
int index = 0;
boolean findCR = false;
int len = buffer.limit();
while(index < len) {
index ++;
byte temp = buffer.get();
if (CR == temp) {
findCR = true;
}
if (LF == temp && findCR && index > 0) { // 找到了行结束符
byte[] copy = new byte[index];
System.arraycopy(buffer.array(), 0, copy, 0, index);
buffer.rewind(); // 位置复原
return ByteBuffer.wrap(copy);
}
}
buffer.rewind(); // 位置复原
return null;
}
private static void readData(Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 获取上次已经读取的数据
ByteBuffer oldBuffer = (ByteBuffer) selectionKey.attachment();
if (logger.isLoggable(Level.FINE)) {
logger.fine("上一次读取的数据:"+oldBuffer+getLineEnd()+toDisplayChar(oldBuffer));
}
// 读新的数据
int readNum = 0;
ByteBuffer newBuffer = ByteBuffer.allocate(1024);
if ( (readNum = socketChannel.read(newBuffer)) <= 0 ) {
return;
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("这次读取的数据:"+newBuffer+getLineEnd()+toDisplayChar(newBuffer));
}
newBuffer.flip();
ByteBuffer lineRemain = getLine(newBuffer);
if (logger.isLoggable(Level.FINE)) {
logger.fine("解析的行数据剩余部分:"+lineRemain+getLineEnd()+toDisplayChar(lineRemain));
}
if (null != lineRemain) { // 获取到行结束符
ByteBuffer completeLine = merge(oldBuffer, lineRemain);
if (logger.isLoggable(Level.FINE)) {
logger.fine("准备输出的数据:"+completeLine+getLineEnd()+toDisplayChar(completeLine));
}
while (completeLine.hasRemaining()) { // 有可能一次没有写完,需多次写
socketChannel.write(completeLine);
}
// 清除数据
selectionKey.attach(null);
clear(oldBuffer);
clear(lineRemain);
// 判断是否退出
String lineStr = getLineContent(completeLine);
if (logger.isLoggable(Level.FINE)) {
logger.fine("判断是否退出的行数据:"+lineStr);
}
if ("exit".equalsIgnoreCase(lineStr) || "quit".equalsIgnoreCase(lineStr)) {
socketChannel.close();
}
// FIXME 行结束符后面是否还有数据? 此部分代码尚未测试
if (lineRemain.limit()+2 < newBuffer.limit()) {
byte[] temp = new byte[newBuffer.limit() - lineRemain.limit()];
newBuffer.get(temp, lineRemain.limit(), temp.length);
selectionKey.attach(temp);
}
} else { // 没有读到一个完整的行,继续读并且带上已经读取的部分数据
ByteBuffer temp = merge(oldBuffer, newBuffer);
socketChannel.register(selector, SelectionKey.OP_READ, temp);
if (logger.isLoggable(Level.FINE)) {
logger.fine("暂存到SelectionKey的数据:"+temp+getLineEnd()+toDisplayChar(temp));
}
}
}
/**
* 接受新的Socket连接。
*
* @param selector 选择器
* @param selectionKey
* @return
* @throws IOException
* @throws ClosedChannelException
*/
private static SocketChannel acceptNew(Selector selector,
SelectionKey selectionKey) throws IOException,
ClosedChannelException {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
if (null != socketChannel) {
if (logger.isLoggable(Level.INFO)) {
logger.info("收到一个新的连接,客户端IP:"+socketChannel.socket().getInetAddress().getHostAddress()+",客户端Port:"+socketChannel.socket().getPort());
}
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
return socketChannel;
}
/**
* 启动服务器。
*
* @param port 服务监听的端口
* @param selectTimeout {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
*/
private static void startServer(int port, int selectTimeout) {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(port));
if (logger.isLoggable(Level.INFO)) {
logger.info("NIO echo网络服务启动完毕,监听端口:" +port);
}
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int selectNum = selector.select(selectTimeout);
if (0 == selectNum) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = (SelectionKey) it.next();
// 接受新的Socket连接
if (selectionKey.isAcceptable()) {
acceptNew(selector, selectionKey);
}
// 读取并处理Socket的数据
if (selectionKey.isReadable()) {
readData(selector, selectionKey);
}
it.remove();
} // end of while iterator
}
} catch (IOException e) {
logger.log(Level.SEVERE, "处理网络连接出错", e);
}
}
public static void main(String[] args) {
int port = 9090;
int selectTimeout = 1000;
startServer(port, selectTimeout);
}
}