欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

通过JAVA NIO实现Socket服务器与客户端功能,niosocket,package nioc

来源: javaer 分享于  点击 31905 次 点评:192

通过JAVA NIO实现Socket服务器与客户端功能,niosocket,package nioc


package niocommunicate;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.CancelledKeyException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Arrays;import java.util.Iterator;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Server {    private Selector selector = getSelector();    private ServerSocketChannel ss = null;    private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,            new ArrayBlockingQueue<Runnable>(20));    private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();    public Selector getSelector() {        try {            return Selector.open();        } catch (IOException e) {            e.printStackTrace();        }        return null;    }    /**     * 创建非阻塞服务器绑定5555端口     */    public Server() {        try {            ss = ServerSocketChannel.open();            ss.bind(new InetSocketAddress(5555));            ss.configureBlocking(false);            if (selector == null) {                selector = Selector.open();            }            ss.register(selector, SelectionKey.OP_ACCEPT);        } catch (Exception e) {            e.printStackTrace();            close();        }    }    /**     * 关闭服务器     */    private void close() {        threadPool.shutdown();        try {            if (ss != null) {                ss.close();            }            if (selector != null) {                selector.close();            }        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 启动选择器监听客户端事件     */    private void start() {        threadPool.execute(new Runnable() {            @Override            public void run() {                try {                    while (true) {                        if (selector.select(10) == 0) {                            continue;                        }                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                        while (iterator.hasNext()) {                            SelectionKey selectedKey = iterator.next();                            iterator.remove();                            try {                                if (selectedKey.isReadable()) {                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {                                        selectionKeyMap.put(selectedKey.hashCode(), selectedKey);                                        threadPool.execute(new ReadClientSocketHandler(selectedKey));                                    }                                } else if (selectedKey.isWritable()) {                                    Object responseMessage = selectedKey.attachment();                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();                                    selectedKey.interestOps(SelectionKey.OP_READ);                                    if (responseMessage != null) {                                        threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,                                                responseMessage));                                    }                                } else if (selectedKey.isAcceptable()) {                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();                                    SocketChannel clientSocket = ssc.accept();                                    if (clientSocket != null) {                                        clientSocket.configureBlocking(false);                                        clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);                                    }                                }                            } catch (CancelledKeyException cc) {                                selectedKey.cancel();                                selectionKeyMap.remove(selectedKey.hashCode());                            }                        }                    }                } catch (Exception e) {                    e.printStackTrace();                    close();                }            }        });    }    /**     * 响应数据给客户端线程     * @author haoguo     *     */    private class WriteClientSocketHandler implements Runnable {        SocketChannel client;        Object respnoseMessage;        WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {            this.client = client;            this.respnoseMessage = respnoseMessage;        }        @Override        public void run() {            byte[] responseByteData = null;            String logResponseString = "";            if (respnoseMessage instanceof byte[]) {                responseByteData = (byte[]) respnoseMessage;                logResponseString = new String(responseByteData);            } else if (respnoseMessage instanceof String) {                logResponseString = (String) respnoseMessage;                responseByteData = logResponseString.getBytes();            }            if (responseByteData == null || responseByteData.length == 0) {                System.out.println("响应的数据为空");                return;            }            try {                client.write(ByteBuffer.wrap(responseByteData));                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString                        + "]");            } catch (IOException e) {                e.printStackTrace();                try {                    client.close();                } catch (IOException e1) {                    e1.printStackTrace();                }            }        }    }    /**     * 读客户端发送数据线程     * @author haoguo     *     */    private class ReadClientSocketHandler implements Runnable {        private SocketChannel client;        private ByteBuffer tmp = ByteBuffer.allocate(1024);        private SelectionKey selectionKey;        ReadClientSocketHandler(SelectionKey selectionKey) {            this.selectionKey = selectionKey;            this.client = (SocketChannel) selectionKey.channel();        }        @Override        public void run() {            try {                tmp.clear();                byte[] data = new byte[0];                int len = -1;                while ((len = client.read(tmp)) > 0) {                    data = Arrays.copyOf(data, data.length + len);                    System.arraycopy(tmp.array(), 0, data, data.length - len, len);                    tmp.rewind();                }                if (data.length == 0) {                    return;                }                System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]");                // dosomthing                byte[] response = "response".getBytes();                client.register(selector, SelectionKey.OP_WRITE, response);            } catch (IOException e) {                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");                try {                    SelectionKey selectionKey = client.keyFor(selector);                    selectionKey.cancel();                    client.close();                } catch (IOException e1) {                    e1.printStackTrace();                }            } finally {                selectionKeyMap.remove(selectionKey.hashCode());            }        }    }    public static void main(String[] args) {        Server server = new Server();        server.start();    }}
package niocommunicate;import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Arrays;import java.util.Iterator;import java.util.LinkedList;import java.util.List;public class Client {    SocketChannel client;    Selector selctor = getSelector();    private volatile boolean run = true;    private List<Object> messageQueue = new LinkedList<>();    public Selector getSelector() {        try {            return Selector.open();        } catch (IOException e) {            e.printStackTrace();        }        return null;    }    public Client() {        try {            client = SocketChannel.open();            client.configureBlocking(false);            client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555));            client.register(selctor, SelectionKey.OP_CONNECT);        } catch (IOException e) {            e.printStackTrace();        }        new Thread(new Runnable() {            @Override            public void run() {                while (run) {                    try {                        if (selctor.select(20) == 0) {                            continue;                        }                        Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();                        while (iterator.hasNext()) {                            SelectionKey selectionKey = iterator.next();                            iterator.remove();                            if (selectionKey.isConnectable()) {                                SocketChannel sc = (SocketChannel) selectionKey.channel();                                sc.finishConnect();                                sc.register(selctor, SelectionKey.OP_READ);                            } else if (selectionKey.isWritable()) {                                selectionKey.interestOps(SelectionKey.OP_READ);                                Object requestMessage = selectionKey.attachment();                                SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();                                byte[] requestByteData = null;                                if (requestMessage instanceof byte[]) {                                    requestByteData = (byte[]) requestMessage;                                } else if (requestMessage instanceof String) {                                    requestByteData = ((String) requestMessage).getBytes();                                    System.out.println("client send Message:[" + requestMessage + "]");                                } else {                                    System.out.println("unsupport send Message Type" + requestMessage.getClass());                                }                                System.out.println("requestMessage:" + requestMessage);                                if (requestByteData != null &amp;&amp; requestByteData.length > 0) {                                    try {                                        writeSocketChannel.write(ByteBuffer.wrap(requestByteData));                                    } catch (IOException e) {                                        e.printStackTrace();                                    }                                }                            } else if (selectionKey.isReadable()) {                                SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();                                ByteBuffer tmp = ByteBuffer.allocate(1024);                                int len = -1;                                byte[] data = new byte[0];                                if ((len = readSocketChannel.read(tmp)) > 0) {                                    data = Arrays.copyOf(data, data.length + len);                                    System.arraycopy(tmp.array(), 0, data, data.length - len, len);                                    tmp.rewind();                                }                                if (data.length > 0) {                                    System.out.println("客户端接收到数据:[" + new String(data) + "]");                                }                            }                        }                    } catch (IOException e1) {                        e1.printStackTrace();                        close();                    }                }            }        }).start();        try {            Thread.sleep(200);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public void close() {        try {            SelectionKey selectionKey = client.keyFor(selctor);            selectionKey.cancel();            client.close();            run = false;        } catch (IOException e) {            e.printStackTrace();        }    }    public void writeData(String data) {        messageQueue.add(data);        while (messageQueue.size() > 0) {            Object firstSendData = messageQueue.remove(0);            try {                client.register(selctor, SelectionKey.OP_WRITE, firstSendData);            } catch (ClosedChannelException e) {                e.printStackTrace();            }            try {                Thread.sleep(40);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        Client client = new Client();        long t1 = System.currentTimeMillis();        for (int i = 10; i < 200; i++) {            client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"                    + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);        }        long t2 = System.currentTimeMillis();        System.out.println("总共耗时:" + (t2 - t1) + "ms");        client.close();    }}
package niocommunicate;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.CancelledKeyException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Arrays;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Server {    private Selector selector = getSelector();    private ServerSocketChannel ss = null;    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,            new ArrayBlockingQueue<Runnable>(20));    private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();    private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>();    private volatile boolean run = true;    private volatile boolean isClose = false;    public Selector getSelector() {        try {            return Selector.open();        } catch (IOException e) {            e.printStackTrace();        }        return null;    }    /**     * 创建非阻塞服务器绑定5555端口     */    public Server() {        try {            ss = ServerSocketChannel.open();            ss.bind(new InetSocketAddress(5555));            ss.configureBlocking(false);            if (selector == null) {                selector = Selector.open();            }            ss.register(selector, SelectionKey.OP_ACCEPT);        } catch (Exception e) {            e.printStackTrace();            close();        }    }    public boolean isClose() {        return isClose;    }    /**     * 关闭服务器     */    private void close() {        run = false;        isClose = true;        threadPool.shutdown();        try {            if (ss != null) {                ss.close();            }            if (selector != null) {                selector.close();            }        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 启动选择器监听客户端事件     */    private void start() {        threadPool.execute(new Runnable() {            @Override            public void run() {                try {                    while (run) {                        if (selector.select(10) == 0) {                            continue;                        }                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                        while (iterator.hasNext()) {                            SelectionKey selectedKey = iterator.next();                            iterator.remove();                            try {                                if (selectedKey.isReadable()) {                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {                                        selectionKeyMap.put(selectedKey.hashCode(), selectedKey);                                        threadPool.execute(new ReadClientSocketHandler(selectedKey));                                    }                                } else if (selectedKey.isWritable()) {                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();                                    selectedKey.interestOps(SelectionKey.OP_READ);                                    List<Object> list = responseMessageQueue.get(selectedKey.hashCode());                                    if (list == null) {                                        list = new LinkedList<Object>();                                        responseMessageQueue.put(selectedKey.hashCode(), list);                                    }                                    while (list.size() > 0) {                                        Object responseMessage = list.remove(0);                                        if (responseMessage != null) {                                            threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,                                                    responseMessage));                                        }                                    }                                } else if (selectedKey.isAcceptable()) {                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();                                    SocketChannel clientSocket = ssc.accept();                                    if (clientSocket != null) {                                        clientSocket.configureBlocking(false);                                        clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);                                    }                                }                            } catch (CancelledKeyException cc) {                                selectedKey.cancel();                                int hashCode = selectedKey.hashCode();                                selectionKeyMap.remove(hashCode);                                responseMessageQueue.remove(hashCode);                            }                        }                    }                } catch (Exception e) {                    e.printStackTrace();                    close();                }            }        });    }    /**     * 响应数据给客户端线程     *     * @author haoguo     *     */    private class WriteClientSocketHandler implements Runnable {        SocketChannel client;        Object respnoseMessage;        WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {            this.client = client;            this.respnoseMessage = respnoseMessage;        }        @Override        public void run() {            byte[] responseByteData = null;            String logResponseString = "";            if (respnoseMessage instanceof byte[]) {                responseByteData = (byte[]) respnoseMessage;                logResponseString = new String(responseByteData);            } else if (respnoseMessage instanceof String) {                logResponseString = (String) respnoseMessage;                responseByteData = logResponseString.getBytes();            }            if (responseByteData == null || responseByteData.length == 0) {                System.out.println("响应的数据为空");                return;            }            try {                client.write(ByteBuffer.wrap(responseByteData));                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString                        + "]");            } catch (IOException e) {                e.printStackTrace();                try {                    SelectionKey selectionKey = client.keyFor(selector);                    if (selectionKey != null) {                        selectionKey.cancel();                        int hashCode = selectionKey.hashCode();                        responseMessageQueue.remove(hashCode);                    }                    if (client != null) {                        client.close();                    }                } catch (IOException e1) {                    e1.printStackTrace();                }            }        }    }    /**     * 读客户端发送数据线程     *     * @author haoguo     *     */    private class ReadClientSocketHandler implements Runnable {        private SocketChannel client;        private ByteBuffer tmp = ByteBuffer.allocate(1024);        private SelectionKey selectionKey;        int hashCode;        ReadClientSocketHandler(SelectionKey selectionKey) {            this.selectionKey = selectionKey;            this.client = (SocketChannel) selectionKey.channel();            this.hashCode = selectionKey.hashCode();        }        @Override        public void run() {            try {                tmp.clear();                byte[] data = new byte[0];                int len = -1;                while ((len = client.read(tmp)) > 0) {                    data = Arrays.copyOf(data, data.length + len);                    System.arraycopy(tmp.array(), 0, data, data.length - len, len);                    tmp.rewind();                }                if (data.length == 0) {                    return;                }                String readData = new String(data);                System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]");                // dosomthing                byte[] response = ("response" + readData.substring(0, 3)).getBytes();                List<Object> list = responseMessageQueue.get(hashCode);                list.add(response);                client.register(selector, SelectionKey.OP_WRITE);                // client.register(selector, SelectionKey.OP_WRITE, response);            } catch (IOException e) {                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");                try {                    SelectionKey selectionKey = client.keyFor(selector);                    if (selectionKey != null) {                        selectionKey.cancel();                    }                    if (client != null) {                        client.close();                    }                } catch (IOException e1) {                    e1.printStackTrace();                }            } finally {                selectionKeyMap.remove(hashCode);            }        }    }    public static void main(String[] args) {        Server server = new Server();        server.start();    }}
相关栏目:

用户点评