JAVA NIO(二)基础 内存管理 文件锁定 Socket服务器客户端通信,
JAVA NIO(二)基础 内存管理 文件锁定 Socket服务器客户端通信,
NIO简介
nio的包主要有四个,分别是
1. 缓冲区包:java.nio 出现版本:Java SE1.4
2. 通道包:java.nio.channels 出现版本:Java SE1.4
3. 字符集包:java.nio.charset 出现版本:Java SE1.4
4. 文件处理包:java nio.file 出现版本:Java SE1.7
a)通道数据的进出,都要经过缓冲区
b)通道是一种新的原生I/O抽象概念,好比连接两端数据的管道,用于数据的交互
c)字符集包:大多数字符集的集合,处理字节字符之间相互转换
d)文件包:处理目录和文件。包含io中File类功能,比之更加强大,更具有名字等价的意义
1.缓冲Buffer
所有的基本数据类型都有相应的缓冲器(布尔型除外),但字节是操作系统及其 I/O设备使用的基本数据类型,所以唯一与通道交互的缓冲器是ByteBuffer。后面再讲字符集转换时会用到CharBuffer
Buffer基础
有几个标志位,用于操作缓冲区
容量(capacity):缓冲区大小(byte),读出、写入值都不会变
位置(position):下一个字节被读出写入的位置,注意:position永远小于limit
界限(limit) :写入时(界限=容量),读出时(上次写入时position的位置)
标记(mark) :初始化为-1,调用reset()可回到标记位置
public static void main(String[] args) {
// 初始方法化一:直接用包装的数组作为缓冲区,不再分配空间
ByteBuffer temp = ByteBuffer.wrap("字节数组".getBytes());
// 初始方法化二,初始化一个容量大小为10的 缓冲区。注意: 初始化时,为写入状态
ByteBuffer bb = ByteBuffer.allocate(10);
bb.capacity();// 容量:10
bb.position();// 位置:0
bb.limit();// 界限:10
// 写入字节
bb.put("345".getBytes());
// position = 3
// 倒回 执行position = 0 ; mark = -1
bb.rewind();
// 写入字节
bb.put("012".getBytes());
// position = 3
// 做个标记(mark)
bb.mark();
// mark = position = 3;
// 继续写入
bb.put("678".getBytes());
// position = 6
// 重置:回到记号处
bb.reset();
// position = mark = 3
// 继续写入
bb.put("345".getBytes());
// position = 6
// 切换到写入状态,调用下面方法
bb.flip();
// 执行:limit = position = 6 ; position = 0 ; mark = -1;
// position=0 可以从头开始读出数据。因为bb中只有6个字节,所以被设置读出的界限为6
//剩下可读取的字节数
bb.remaining();
while (bb.hasRemaining()) {// 是否还有没有读取的字节
// 读出1个字节
bb.get();
}
// position = 6
// 继续从头读
bb.rewind();
// position = 0
// 设置position = 3
bb.position(3);
System.out.println((char) bb.get()); // 输出3
// 包含索引的get(),不会改变position的值
System.out.println((char) bb.get(1)); // 输出1
// position = 4
// 创建一个只读的缓冲区,两个缓冲区共享数据元素
ByteBuffer readOnly = bb.asReadOnlyBuffer();
// 复制一个缓冲区,两个缓冲共享数据元素,有各自的位置、标记等。
// 如果原始的缓冲区为只读,或者为直接缓冲区,新的缓冲区将继承这些属性
ByteBuffer duplicate = bb.duplicate();
// 创建一个从原始缓冲区的当前位置开始的新缓冲区
// 其容量是原始缓冲区的剩余元素数量(limit-position)
ByteBuffer slice = bb.slice();
// 判断缓冲区是否是只读
readOnly.isReadOnly(); // true
// 切换到写入状态
bb.clear();
// position = 0 ; limit=10
}
间接缓冲区:
ByteBuffer bb = ByteBuffer.allocate(1024);
CharBuffer cb = CharBuffer.allocate(1024);
// 对缓冲区的修改会影响到wrap的数组
ByteBuffer bbw = ByteBuffer.wrap("wrap".getBytes());
CharBuffer cbw = CharBuffer.wrap("aaa");
//还有其他非boolean的基本类型可以这样创建
直接缓冲区:
// 创建一个直接缓冲区,只有字节缓冲区有这个工厂方法
ByteBuffer bb = ByteBuffer.allocateDirect(1024);
// 构建一个通道
FileChannel fc = new FileInputStream(new File("文件路径")).getChannel();
// 将数据从磁盘读到直接缓冲区
fc.read(bb);
内存映射文件:
测试一下3种缓冲内存使用情况
通道channel
FileChannel
public static void main(String[] args) throws IOException {
File file = new File("D:\\TEXT.txt");
// 只能用于读的通道
FileChannel readCh = new FileInputStream(file).getChannel();
// 只能用于写的通道
FileChannel writCh = new FileOutputStream(file).getChannel();
// 可以读也可以写的通道
FileChannel rwCh = new RandomAccessFile(file, "rw").getChannel();
// 在Java SE1.7中提供了新的初始化方法
// 这个Path后面会介绍
Path path = Paths.get("D:", "TEXT.txt");
// 只能用于读的通道
FileChannel nReadCh = FileChannel.open(path, StandardOpenOption.READ);
// 只能用于写的通道
FileChannel nWriteCh = FileChannel.open(path, StandardOpenOption.WRITE);
// 可以读也可以写的通道,第二个参数可以是个数组
FileChannel nReadWrite = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ);
}
5. 基本方法
public static void main(String[] args) throws IOException {
File file = new File("D:\\t2.txt");
// 如果文件不存,就会直接创建一个空文件
FileChannel rw = new RandomAccessFile(file, "rw").getChannel();
// 初始化一个缓冲区
ByteBuffer bb = ByteBuffer.wrap("temp".getBytes());
// 注意 通道也有一个读写的位置,而且是从底层的文件描述符获得的
// 这也就意味着一个对象对该position的更新可以被另一个对象看到
rw.position();
// 向通道道写入数据
while (bb.hasRemaining()) {
// position放在末尾,write会自动对文件进行扩容
rw.write(bb);
}
// 先清空缓冲区
bb.clear();
// 通道的读写文件位置在末尾,设置到0位置
rw.position(0);
// 读数据到缓冲区
rw.read(bb);
// 通道关联文件的大小(字节)
rw.size();
// 截断文件,只保留前三个字节,其他删掉
rw.truncate(3);
// 所有的现代文件系统都会缓存数据和延迟磁盘文件更新以提高性能。调用force()方法要求文件的所有待定修改立即同步到磁盘
// boolean参数设置 元数据 是否要写到磁盘
// 元数据:指文件所有者、访问权限、最后一次修改时间等信息
// 同步元数据要求操作系统至少一次的I/O操作,为了提高性能,可以不同步元数据,同时也不会牺牲数据完整性
rw.force(false);
// 关闭通道
if (rw.isOpen())
rw.close();
}
6. Channel-to-Channel传输
public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {
// 这里仅列出部分API
public abstract long transferTo (long position, long count, WritableByteChannel target)
public abstract long transferFrom (ReadableByteChannel src, long position, long count)
}
transferTo()和transferFrom()方法允许将一个通道交叉连接到另一个通道,而不需要通过一个中间缓冲区来传递数据。只有FileChannel类有这两个方法,因此Channel-to-Channel传输中通道之一必须是FileChannel。不能在socket通道之间直接传输数据,不过socket通道实现WritableByteChannel和ReadableByteChannel接口,因此文件的内容可以用transferTo()方法传输给一个socket通道,或者也可以用transferFrom()方法将数据从一个socket通道直接读取到一个文件中。
public static void main(String[] args) throws IOException {
// 将两个通道相连传输数据
File inFile = new File("D:" + File.separator + "temp.png");
File outFile = new File("E:" + File.separator + "temp.png");
FileChannel in = new FileInputStream(inFile).getChannel();
FileChannel out = new FileOutputStream(outFile).getChannel();
// 将inFile文件数据拷贝到outFile
out.transferFrom(in, 0, in.size());
in.transferTo(0, in.size(), out);
}
7. 通道可以向缓冲区数组写入数据,并按顺序填充每个缓冲区直到所有缓冲区满或者没有数据可读为止。聚集写也是以类似的方式完成,数据从列表中的每个缓冲区中顺序取出来发送到通道就好像顺序写入一样
文件锁定
socket通道
SocketChannel和DatagramChannel都实现了读写功能的接口,而ServerSocketChannel并没有实现。ServerSocketChannel只负责监听传入的连接和创建SocketChannel对象。
真正的就绪选择必须由操作系统来做。操作系统的一项最重要的功能就是处理I/O请求并通知各个线程它们的数据已经准备好了。选择器类提供了这种抽象,使用Java代码能够以可移植的方式,请求底层的操作系统提供就绪选择服务。
可以只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件,线程池的大小是可以调整的(或者它自己进行动态的调整)。
package i.io.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
public class SelectSockets {
/**
* 服务器端
*/
public static class ServerSocketListen implements Runnable {
@Override
public void run() {
try {
server(1234);
} catch (Exception e) {
e.printStackTrace();
}
}
public void server(int... port) throws Exception {
// 初始化一个选择器
Selector selector = Selector.open();
// 监听多个端口
for (int pt : port) {
System.out.println("Listening on port " + pt);
// 初始化一个服务器套接字通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置端口服务器通道监听的端口
serverChannel.bind(new InetSocketAddress(pt));
// 设置监听套接字的非阻塞模式
serverChannel.configureBlocking(false);
// 注册ServerSocketChannel选择器
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
while (true) {
// 这个可能阻塞很长时间,返回后选择集包含准备好的通道键
if (selector.select() == 0)
continue;
// 处理准备好的通道
handleChannel(selector);
}
}
/**
* 注册通道和通道感兴趣的业务到选择器
*/
protected void handleChannel(Selector selector) throws Exception {
// 得到选择键的迭代器
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 有新的连接
if (key.isAcceptable()) {
// 得到服务器通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
// 设置通道为非阻塞
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
// 有可读取数据的通道
if (key.isReadable()) {
readDataFromSocket(key);
}
it.remove();
}
}
/**
* 读取通道的数据
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 初始化缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 将数据读到缓冲区
while (socketChannel.read(buffer) > 0) {
// 切换到缓冲区到读模式
buffer.flip();
// 以字符视图打开缓冲
CharBuffer cb = buffer.asCharBuffer();
StringBuilder messClient = new StringBuilder();
while (cb.hasRemaining()) {
messClient.append(cb.get());
}
System.err.println("2-服务器端接收客户端数据:" + messClient.toString());
buffer.clear();
}
// 回写
System.err.println("3-反馈数据到客户端:go");
Charset charset = Charset.forName("gbk");
socketChannel.write(charset.encode("还可以吧"));
}
}
/**
* 客户端类
*/
public static class ClientSocketListen {
public void client() throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT);
sc.connect(new InetSocketAddress(1234));
while (true) {
if (selector.select() == 0)
continue;
handleChannel(selector);
}
}
protected void handleChannel(Selector selector) throws ClosedChannelException, IOException {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
int op = key.readyOps();
SocketChannel channel = (SocketChannel) key.channel();
// 监听这个通道,用于接收服务器端的反馈数据
if ((op & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) {
if (channel.finishConnect()) {
//一个选择器只能有一个当前通道的实例,
// channel.register(selector, SelectionKey.OP_READ);
key.interestOps(SelectionKey.OP_READ);
System.out.println("1-客户端发数据到服务器:go");
ByteBuffer bb = ByteBuffer.allocate(8);
bb.asCharBuffer().put("4个字符");
channel.write(bb);
bb.clear();
}
}
// 接收服务器端反馈数据
if ((op & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
ByteBuffer bb = ByteBuffer.allocate(1024);
while (channel.read(bb) > 0) {
bb.flip();
Charset charset = Charset.forName("gbk");
System.out.println("4-客户端接收服务器反馈数据:" + charset.decode(bb));
}
}
it.remove();
}
}
}
public static void main(String[] argv) throws Exception {
// 先用一个线程启动服务器端
new Thread(new SelectSockets.ServerSocketListen()).start();
// 客户端调用
new SelectSockets.ClientSocketListen().client();
}
}
DatagramChannel是面向UDP的,DatagramChannel对象既可以充当服务器(监听者)也可以充当客户端(发送者), 不同于SocketChannel(必须连接了才有用并且只能连接一次),DatagramChannel对象可以任意次数地进行连接或断开连接。每次连接都可以到一个不同的远程地址。调用disconnect()方法可以配置通道,以便它能再次接收来自安全管理器(如果已安装)所允许的任意远程地址的数据或发送数据到这些地址上。
列出几种使用数据包的情况
程序可以承受数据丢失或无序的数据。
希望「发射后不管」(fire and forget)而不需要知道您发送的包是否已接收。
数据吞吐量比可靠性更重要。
您需要同时发送数据给多个接受者(多播或者广播)。
包隐喻比流隐喻更适合手边的任务。
相关文章
- 暂无相关文章
用户点评