欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

mq从抓取消息,mq抓取消息,/** * MQ 消息拉

来源: javaer 分享于  点击 44156 次 点评:266

mq从抓取消息,mq抓取消息,/** * MQ 消息拉


/** * MQ 消息拉取者 * */class MqPuller{    private static final Logger logger = Logger.getLogger(MqPuller.class);    private Ice.Communicator ic;    private MQPollPrx mqPoll;    private String proxy;//要连接的ICE服务    private String token;//权限令牌    private String uri;//分配URL    private String queueNames;//消息类型    private int count;//每次拉取多少条消息    private boolean debugMode;    /**     * MQ 客户端初始化     *      * Properties default value is:     *      * jq.proxy=MQPollS:tcp -p 10000 -h 10.10.224.63     * jq.token=qazwsx     * jq.uri=test     * jq.queueNames=31.30,31.40,31.50     * jq.count=100     * jq.debugMode=true     *      * @param prop     */    public void init(Properties prop) throws Exception{        if (prop == null) {            logger.error("init() parameter prop is null.");            throw new IllegalArgumentException("prop cannot null!");        }        logger.info("init()...");        proxy = prop.getProperty("jq.proxy", "MQPollS:tcp -p 10000 -h 10.10.224.63");        token = prop.getProperty("jq.token", "654321");        uri = prop.getProperty("jq.uri", "test");        queueNames = prop.getProperty("jq.queueNames", "31.30,31.40,31.50");        count = Integer.parseInt(prop.getProperty("jq.count", "100"));        debugMode = Boolean.parseBoolean(prop.getProperty("jq.debugMode", "true"));        logger.info("jq.proxy=" + proxy );        logger.info("jq.token=" + token );        logger.info("jq.uri=" + uri );        logger.info("jq.queueNames=" + queueNames );        logger.info("jq.count=" + count );        logger.info("jq.debugMode=" + debugMode );        if(ic == null || ic.isShutdown()){            ic = Ice.Util.initialize("--Ice.MessageSizeMax=2000".split(";"));            Ice.ObjectPrx objPrx = ic.stringToProxy(proxy);            mqPoll = MQPollPrxHelper.checkedCast(objPrx);            Map<String, String> map = new HashMap<String, String>();            map.put("token", token);            mqPoll = (MQPollPrx) mqPoll.ice_context(map);            logger.info("init() complete!");        }        else{            logger.info("init: ic is running !");        }    }    /**     * 从MQ拉取消息     *      * @return List<String[]>  {queueName, msgId, msgContent}     */    public List<String[]> pullMessage() throws Exception{        List<String[]> result = new ArrayList<String[]>();        String[] queueNamesArray = queueNames.split(",");        for (String queueName : queueNamesArray) {            List<Integer> ids = mqPoll.pollIds(queueName, uri, count, 0);            if(ids != null && ids.size()>0){                for (Integer msgId : ids) {                    String msgContent = mqPoll.pollMsg(msgId);                    if(msgContent != null && msgContent.trim().length()>0){                        String arr[] = {queueName, msgId.toString(), msgContent};                        result.add(arr);                    }                }            }        }        return result;    }    public boolean commitMessage(String msgid) throws Exception{        return mqPoll.CommitMsg(msgid);    }    public boolean isInit() throws Exception{        return (ic != null && !ic.isShutdown());    }    public boolean isDebugMode() {        return debugMode;    }//该片段来自于http://byrx.net
相关栏目:

用户点评