mq从抓取消息,mq抓取消息,/** * MQ 消息拉
分享于 点击 44156 次 点评:266
mq从抓取消息,mq抓取消息,/** * MQ 消息拉
/** * MQ 消息拉取者 * */class MqPuller{ private static final Logger logger = Logger.getLogger(MqPuller.class); private Ice.Communicator ic; private MQPollPrx mqPoll; private String proxy;//要连接的ICE服务 private String token;//权限令牌 private String uri;//分配URL private String queueNames;//消息类型 private int count;//每次拉取多少条消息 private boolean debugMode; /** * MQ 客户端初始化 * * Properties default value is: * * jq.proxy=MQPollS:tcp -p 10000 -h 10.10.224.63 * jq.token=qazwsx * jq.uri=test * jq.queueNames=31.30,31.40,31.50 * jq.count=100 * jq.debugMode=true * * @param prop */ public void init(Properties prop) throws Exception{ if (prop == null) { logger.error("init() parameter prop is null."); throw new IllegalArgumentException("prop cannot null!"); } logger.info("init()..."); proxy = prop.getProperty("jq.proxy", "MQPollS:tcp -p 10000 -h 10.10.224.63"); token = prop.getProperty("jq.token", "654321"); uri = prop.getProperty("jq.uri", "test"); queueNames = prop.getProperty("jq.queueNames", "31.30,31.40,31.50"); count = Integer.parseInt(prop.getProperty("jq.count", "100")); debugMode = Boolean.parseBoolean(prop.getProperty("jq.debugMode", "true")); logger.info("jq.proxy=" + proxy ); logger.info("jq.token=" + token ); logger.info("jq.uri=" + uri ); logger.info("jq.queueNames=" + queueNames ); logger.info("jq.count=" + count ); logger.info("jq.debugMode=" + debugMode ); if(ic == null || ic.isShutdown()){ ic = Ice.Util.initialize("--Ice.MessageSizeMax=2000".split(";")); Ice.ObjectPrx objPrx = ic.stringToProxy(proxy); mqPoll = MQPollPrxHelper.checkedCast(objPrx); Map<String, String> map = new HashMap<String, String>(); map.put("token", token); mqPoll = (MQPollPrx) mqPoll.ice_context(map); logger.info("init() complete!"); } else{ logger.info("init: ic is running !"); } } /** * 从MQ拉取消息 * * @return List<String[]> {queueName, msgId, msgContent} */ public List<String[]> pullMessage() throws Exception{ List<String[]> result = new ArrayList<String[]>(); String[] queueNamesArray = queueNames.split(","); for (String queueName : queueNamesArray) { List<Integer> ids = mqPoll.pollIds(queueName, uri, count, 0); if(ids != null && ids.size()>0){ for (Integer msgId : ids) { String msgContent = mqPoll.pollMsg(msgId); if(msgContent != null && msgContent.trim().length()>0){ String arr[] = {queueName, msgId.toString(), msgContent}; result.add(arr); } } } } return result; } public boolean commitMessage(String msgid) throws Exception{ return mqPoll.CommitMsg(msgid); } public boolean isInit() throws Exception{ return (ic != null && !ic.isShutdown()); } public boolean isDebugMode() { return debugMode; }//该片段来自于http://byrx.net
用户点评