前导文章:
《常见IO模型浅析》
《说说Redis的非阻塞IO多路复用技术》

1 缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/**
* 一、根据数据类型不同(boolean除外),
* 提供了相应类型的缓冲区
* ByteBuffer
* CharBuffer
* ShortBuffer
* IntBuffer
* LongBuffer
* FloatBuffer
* DoubleBuffer
*
通过allocate获取缓冲区
*
* 二、缓冲区的中有两个核心方法:
* 1 put 存入数据到缓冲区
* 2 get 获取缓存区中的数据
*
* 三、四个核心属性
* capacity : 缓冲区最大存储数据的容量,一旦声明不能修改
*
* 写数据模式
* limit : 缓冲区存在的数据量
* position : 标识缓冲区中正在操作数据的位置
*
* 四、mark : 标记,标识记录当前position的位置,可以通过
* reset 恢复到mark的位置
*
* 五、直接缓冲区与非直接缓冲区
* 非直接缓冲区:通过allocate方法分配缓冲区,
* 将缓冲区建立在JVM的内存中
* 直接缓冲区:通过allocateDirect方法分配直接缓冲区,
* 缓冲区建立在物理内存中,可以提高效率
*
* position < = limit < capacity
* @author dqf
*
*/
public class TestBuffer {
@Test
public void test3(){
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
System.out.println(buf.isDirect());
}
@Test
public void test2(){
String str = "abcde";
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(str.getBytes());
buf.flip();
byte[] dst = new byte[buf.limit()];
buf.get(dst,0,2);
System.out.println(new String(dst,0,2));
//position 2
System.out.println(buf.position());
//mark 标记
buf.mark();
buf.get(dst,2,2);
System.out.println(new String(dst,2,2));
// position 4
System.out.println(buf.position());
//reset 恢复到mark的位置
buf.reset();
// position 2
System.out.println(buf.position());
/**
* 判断缓冲区中是否还有剩余数据
*/
if(buf.hasRemaining()){
// 获取缓冲区中可以操作的数量 3
System.out.println(buf.remaining());
}
}
@Test
public void test1(){
/**
* 初始化时,
* capacity = limit = 1024
* position = 0
*/
ByteBuffer buf = ByteBuffer.allocate(1024);
String str = "abcde";
/**
* capacity = limit = 1024
* position = 5
*/
buf.put(str.getBytes());
/**
* 切换到读数据模式
* position = 0
* limit = 5
* capacity = 1024
*/
buf.flip();
/**
* 利用get读取缓冲区的数据
* position = 5
* limit = 5
* capacity = 1024
*/
byte[] dst = new byte[buf.limit()];
buf.get(dst);
System.out.println(new String(dst,0,dst.length));
/**
* rewind : 重读
* position = 0
* limit = 5
* capacity = 1024
*/
buf.rewind();
/**
* clear : 清空缓冲区,其实缓冲区中原来的数据依然存在,
* 只是把position指针重新指到0的位置
* position = 0
* limit = 1024
* capacity = 1024
*/
buf.clear();
// 还能获取到值
System.out.println((char)buf.get());
}
}

2 通道

文件复制过程:

  1. 将通道1中的数据写入缓冲区
  2. 将缓冲区中的数据写入通道2

read是往缓冲区写,write是往通道写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 一、通道
* 用于源节点与目标节点的连接。在Java NIO中负责缓冲区中数据的传输。
* 通道本身不存储数据,因此需要配合缓冲区
*
* 二、通道的主要实现类
* java.nio.channel.Channel接口
* FileChannel SocketChannel ServerSocketChannel
* DatagramChannel
*
* 三、获取通道
* 1 Java 针对支持通道的类提供了getChannel方法
* 本地IO:FileInputStream/FileOutputStream/RandomAccessFile
* 网络IO:Socket/ServerSocket/DatagramSocket
*
* 2 在JDK7中NIO.2针对通道提供了静态方法open
*
* 3 在JDK7中NIO.2的Files工具类的newByteChannel()方法
*
* 四、通道之间的数据传输,通过直接缓冲区
* transferFrom()
* transferTo()
* @author dqf
*
*/
public class TestChannel {
/**
* 通道之间的数据传输,也是通过直接缓冲区
*/
@Test
public void test3()throws Exception{
FileChannel inChannel =
FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel =
FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
inChannel.transferTo(0, inChannel.size(), outChannel);
// outChannel.transferFrom(inChannel, 0, inChannel.size());
inChannel.close();
outChannel.close();
}
/**
* 使用直接缓冲区完成文件的复制(内存映射文件,和allocateDirect一样)
* 不需要通道
*/
@Test
public void test2()throws Exception{
FileChannel inChannel =
FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel =
FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 内存映射文件(只有ByteBuffer支持内存映射的方式直接缓冲区)
MappedByteBuffer inMapBuf = inChannel.map(MapMode.READ_ONLY , 0, inChannel.size());
MappedByteBuffer outMapBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区对数据进行读写操作
byte[] dst = new byte[inMapBuf.limit()];
inMapBuf.get(dst);
outMapBuf.put(dst);
inChannel.close();
outChannel.close();
}
/**
* 利用通道完成文件的复制
*/
@Test
public void test1() throws Exception {
FileInputStream fis = new FileInputStream("1.jpg");
FileOutputStream fos = new FileOutputStream("2.jpg");
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
//分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//将通道中的数据存入缓冲区
while(inChannel.read(buf)!=-1){
buf.flip();
//将缓冲区中的数据写入通道
outChannel.write(buf);
buf.clear();
}
outChannel.close();
inChannel.close();
fos.close();
fis.close();
}
}

3 阻塞 IO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 一、使用NIO完成网络通信的三个核心
* 1 通道
* 2 缓冲区
* 3 选择器
* @author dqf
*
*/
public class TestBlockNIO {
// 客户端
@Test
public void client() throws IOException{
open(new InetSocketAddress("127.0.0.1",9898));
FileChannel inChanel = FileChannel.
open(Paths.get("1.jgp"), StandardOpenOption.READ);
ByteBuffer buf = ByteBuffer.allocate(1024);
// 将file通过中是数据写入缓冲区
while(inChanel.read(buf) != -1){
buf.flip();
// 将缓冲区中的数据写入通道
sChannel.write(buf);
buf.clear();
}
int len = 0;
// 将通道中的数据写入缓冲区
while((len = sChannel.read(buf)) != -1){
buf.flip();
System.out.println(new String(buf.array(),0,len));
buf.clear();
}
inChanel.close();
sChannel.close();
}
// 服务器端
@Test
public void server() throws IOException{
ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChanel = FileChannel.
open(Paths.get("2.jgp"), StandardOpenOption.READ);
ssChannel.bind(new InetSocketAddress(9898));
SocketChannel sChannel = ssChannel.accept();
ByteBuffer buf = ByteBuffer.allocate(1024);
while(sChannel.read(buf) != -1){
buf.flip();
outChanel.write(buf);
buf.clear();
}
buf.put("服务器接收数据:".getBytes());
buf.flip();
// 缓冲区数据写入通道
sChannel.write(buf);
sChannel.close();
outChanel.close();
ssChannel.close();
}
}

4 非阻塞 IO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 非阻塞IO
* @author dqf
*
*/
public class testNoBlocking {
//客户端
@Test
public void client()throws Exception{
// 获取通道
SocketChannel sChannel =
SocketChannel.open(new InetSocketAddress("127.0.0.1",9898));
// 切换成非阻塞模式
sChannel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
// 发送数据给服务器端
buf.put(new Date().toString().getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
sChannel.close();
}
/**
* 服务器端
*/
@Test
public void server() throws Exception {
//1 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2 切换非阻塞模式
ssChannel.configureBlocking(false);
//3 绑定链接
ssChannel.bind(new InetSocketAddress(9898));
//4 获取选择器
Selector selector = Selector.open();
//5 将通道注册到选择器上
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6 轮询获取选择器上 准备就绪 的时间
while(selector.select() > 0 ) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
// 判断具体是什么事件准备就绪
if(sk.isAcceptable()){
SocketChannel sChanel = ssChannel.accept();
// 切换客户端非阻塞模式
sChanel.configureBlocking(false);
//将 该通道注册到选择器上
sChanel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
SocketChannel sChannel = (SocketChannel) sk.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0 ;
while((len = sChannel.read(buf)) > 0){
buf.flip();
System.out.println(new String(buf.array(),0,len));
buf.clear();
}
}
// 取消选择键
it.remove();
}
}
}
}