欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Java多线程实现文件快速切分,java多线程切分,前段时间需要进行大批量数

来源: javaer 分享于  点击 41973 次 点评:157

Java多线程实现文件快速切分,java多线程切分,前段时间需要进行大批量数


前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

[Java]代码 ``` javaimport org.apache.log4j.LogManager;import org.apache.log4j.Logger;

import java.io.;import java.util.;import java.util.concurrent.*;

public class FileSplitUtil {

private final static Logger log = LogManager.getLogger(FileSplitUtil.class);private static final long originFileSize = 1024 * 1024 * 100;// 100Mprivate static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方/** * CVS文件分隔符 */private static final char cvsSeparator = '^';public static  void  main(String args[]){    long start = System.currentTimeMillis();    try {        String fileName = "D:\\csvtest\\aa.csv";        File sourceFile = new File(fileName);        if (sourceFile.length() >= originFileSize) {            String cvsFileName = fileName.replaceAll("\\\\", "/");            FileSplitUtil fileSplitUtil = new FileSplitUtil();            List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);            for(String part:parts){                System.out.println("partName is:"+part);            }        }        System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms.");    }catch (Exception e){        log.info(e.getStackTrace());    }}/** * 拆分文件 * * @param fileName 待拆分的完整文件名 * @param byteSize 按多少字节大小拆分 * @return 拆分后的文件名列表 */public List<String> splitBySize(String fileName, int byteSize)        throws IOException, InterruptedException {    List<String> parts = new ArrayList<String>();    File file = new File(fileName);    int count = (int) Math.ceil(file.length() / (double) byteSize);    int countLen = (count + "").length();    RandomAccessFile raf = new RandomAccessFile(fileName, "r");    long totalLen = raf.length();    CountDownLatch latch = new CountDownLatch(count);    for (int i = 0; i < count; i++) {        String partFileName = file.getPath() + "."                + leftPad((i + 1) + "", countLen, '0') + ".cvs";        int readSize=byteSize;        long startPos=(long)i * byteSize;        long nextPos=(long)(i+1) * byteSize;        if(nextPos>totalLen){            readSize= (int) (totalLen-startPos);        }        new SplitRunnable(readSize, startPos, partFileName, file, latch).run();        parts.add(partFileName);    }    latch.await();//等待所有文件写完    //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行    mergeRow(parts);    return parts;}/** * 分割处理Runnable * * @author supeidong */private class SplitRunnable implements Runnable {    int byteSize;    String partFileName;    File originFile;    long startPos;    CountDownLatch latch;    public SplitRunnable(int byteSize, long startPos, String partFileName,                         File originFile, CountDownLatch latch) {        this.startPos = startPos;        this.byteSize = byteSize;        this.partFileName = partFileName;        this.originFile = originFile;        this.latch = latch;    }    public void run() {        RandomAccessFile rFile;        OutputStream os;        try {            rFile = new RandomAccessFile(originFile, "r");            byte[] b = new byte[byteSize];            rFile.seek(startPos);// 移动指针到每“段”开头            int s = rFile.read(b);            os = new FileOutputStream(partFileName);            os.write(b, 0, s);            os.flush();            os.close();            latch.countDown();        } catch (IOException e) {            log.error(e.getMessage());            latch.countDown();        }    }}/** * 合并被切断的行 * * @param parts */private void mergeRow(List<String> parts) {    List<PartFile> partFiles = new ArrayList<PartFile>();    try {        //组装被切分表对象        for (int i=0;i<parts.size();i++) {            String partFileName=parts.get(i);            File splitFileTemp = new File(partFileName);            if (splitFileTemp.exists()) {                PartFile partFile = new PartFile();                BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));                String firstRow = reader.readLine();                String secondRow = reader.readLine();                String endRow = readLastLine(partFileName);                partFile.setPartFileName(partFileName);                partFile.setFirstRow(firstRow);                partFile.setEndRow(endRow);                if(i>=1){                    String prePartFile=parts.get(i - 1);                    String preEndRow = readLastLine(prePartFile);                    partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));                }                partFiles.add(partFile);                reader.close();            }        }        //进行需要合并的行的写入        for (int i = 0; i < partFiles.size() - 1; i++) {            PartFile partFile = partFiles.get(i);            PartFile partFileNext = partFiles.get(i + 1);            StringBuilder sb = new StringBuilder();            if (partFileNext.getFirstIsFull()) {                sb.append("\r\n");                sb.append(partFileNext.getFirstRow());            } else {                sb.append(partFileNext.getFirstRow());            }            writeLastLine(partFile.getPartFileName(),sb.toString());        }    } catch (Exception e) {        log.error(e.getMessage());    }}/** * 得到某个字符出现的次数 * @param s * @return */private int getCharCount(String s) {    int count = 0;    for (int i = 0; i < s.length(); i++) {        if (s.charAt(i) == cvsSeparator) {            count++;        }    }    return count;}/** * 采用BufferedInputStream方式读取文件行数 * * @param filename * @return */public int getFileRow(String filename) throws IOException {    InputStream is = new BufferedInputStream(new FileInputStream(filename));    byte[] c = new byte[1024];    int count = 0;    int readChars = 0;    while ((readChars = is.read(c)) != -1) {        for (int i = 0; i < readChars; ++i) {            if (c[i] == '\n')                ++count;        }    }    is.close();    return count;}/** * 读取最后一行数据 * @param filename * @return * @throws IOException */private String readLastLine(String filename) throws IOException {    // 使用RandomAccessFile , 从后找最后一行数据    RandomAccessFile raf = new RandomAccessFile(filename, "r");    long len = raf.length();    String lastLine = "";    if(len!=0L) {        long pos = len - 1;        while (pos > 0) {            pos--;            raf.seek(pos);            if (raf.readByte() == '\n') {                lastLine = raf.readLine();                lastLine=new String(lastLine.getBytes("8859_1"), "gbk");                break;            }        }    }    raf.close();    return lastLine;}/** * 修改最后一行数据 * @param fileName * @param lastString * @return * @throws IOException */private void writeLastLine(String fileName,String lastString){    try {        // 打开一个随机访问文件流,按读写方式        RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");        // 文件长度,字节数        long fileLength = randomFile.length();        //将写文件指针移到文件尾。        randomFile.seek(fileLength);        //此处必须加gbk,否则会出现写入乱码        randomFile.write(lastString.getBytes("gbk"));        randomFile.close();    } catch (IOException e) {        log.error(e.getMessage());    }}/** * 左填充 * * @param str * @param length * @param ch * @return */public static String leftPad(String str, int length, char ch) {    if (str.length() >= length) {        return str;    }    char[] chs = new char[length];    Arrays.fill(chs, ch);    char[] src = str.toCharArray();    System.arraycopy(src, 0, chs, length - src.length, src.length);    return new String(chs);}/** * 合并文件行内部类 */class PartFile {    private String partFileName;    private String firstRow;    private String endRow;    private boolean firstIsFull;    public String getPartFileName() {        return partFileName;    }    public void setPartFileName(String partFileName) {        this.partFileName = partFileName;    }    public String getFirstRow() {        return firstRow;    }    public void setFirstRow(String firstRow) {        this.firstRow = firstRow;    }    public String getEndRow() {        return endRow;    }    public void setEndRow(String endRow) {        this.endRow = endRow;    }    public boolean getFirstIsFull() {        return firstIsFull;    }    public void setFirstIsFull(boolean firstIsFull) {        this.firstIsFull = firstIsFull;    }}

}

```

相关栏目:

用户点评