关闭 x
IT技术网
    技 采 号
    ITJS.cn - 技术改变世界
    • 实用工具
    • 菜鸟教程
    IT采购网 中国存储网 科技号 CIO智库

    IT技术网

    IT采购网
    • 首页
    • 行业资讯
    • 系统运维
      • 操作系统
        • Windows
        • Linux
        • Mac OS
      • 数据库
        • MySQL
        • Oracle
        • SQL Server
      • 网站建设
    • 人工智能
    • 半导体芯片
    • 笔记本电脑
    • 智能手机
    • 智能汽车
    • 编程语言
    IT技术网 - ITJS.CN
    首页 » JAVA »分布式消息系统Kafka Java客户端代码

    分布式消息系统Kafka Java客户端代码

    2014-08-12 00:00:00 出处:ITJS
    分享

    介绍

    kafka是一种高吞吐量的分布式发布订阅消息系统。

    kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)

    当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。

    高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

    测试环境

    kafka_2.10-0.8.1.1 3个节点做的集群

    zookeeper-3.4.5 一个实例节点

    代码示例

    消息生产者代码示例

    import java.util.Collections;
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    /**
    * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
    * @author Fung
    *
    */
    public class ProducerDemo {
    public static void main(String[] args) {
    Random rnd = new Random();
    int events=100;

    // 设置配置属性
    Properties props = new Properties();
    props.put(“metadata.broker.list”,”172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092″);
    props.put(“serializer.class”, ”kafka.serializer.StringEncoder”);
    // key.serializer.class默认为serializer.class
    props.put(“key.serializer.class”, ”kafka.serializer.StringEncoder”);
    // 可选配置,如果不配置,则使用默认的partitioner
    props.put(“partitioner.class”, ”com.catt.kafka.demo.PartitionerDemo”);
    // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
    // 值为0,1,-1,可以参考
    // http://kafka.apache.org/08/configuration.html
    props.put(“request.required.acks”, ”1″);
    ProducerConfig config = new ProducerConfig(props);

    // 创建producer
    Producer<String, String> producer = new Producer<String, String>(config);
    // 产生并发送消息
    long start=System.currentTimeMillis();
    for (long i = 0; i < events; i++) {
    long runtime = new Date().getTime();
    String ip = ”192.168.2.” + i;//rnd.nextInt(255);
    String msg = runtime + ”,www.example.com,” + ip;
    //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
    KeyedMessage<String, String> data = new KeyedMessage<String, String>(
    “page_visits”, ip, msg);
    producer.send(data);
    }
    System.out.println(“耗时:” + (System.currentTimeMillis() - start));
    // 关闭producer
    producer.close();
    }
    }

    消息消费者代码示例

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;

    /**
    * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
    *
    * @author Fung
    *
    */
    public class ConsumerDemo {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;

    public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
    this.topic = a_topic;
    }

    public void shutdown() {
    if (consumer != null)
    consumer.shutdown();
    if (executor != null)
    executor.shutdown();
    }

    public void run(int numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
    .createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    executor = Executors.newFixedThreadPool(numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerMsgTask(stream, threadNumber));
    threadNumber++;
    }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
    String a_groupId) {
    Properties props = new Properties();
    props.put(“zookeeper.connect”, a_zookeeper);
    props.put(“group.id”, a_groupId);
    props.put(“zookeeper.session.timeout.ms”, ”400″);
    props.put(“zookeeper.sync.time.ms”, ”200″);
    props.put(“auto.commit.interval.ms”, ”1000″);

    return new ConsumerConfig(props);
    }

    public static void main(String[] arg) {
    String[] args = { ”172.168.63.221:2188″, ”group-1″, ”page_visits”, ”12″ };
    String zooKeeper = args[0];
    String groupId = args[1];
    String topic = args[2];
    int threads = Integer.parseInt(args[3]);

    ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
    demo.run(threads);

    try {
    Thread.sleep(10000);
    } catch (InterruptedException ie) {

    }
    demo.shutdown();
    }
    }

    消息处理类

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;

    public class ConsumerMsgTask implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
    m_threadNumber = threadNumber;
    m_stream = stream;
    }

    public void run() {
    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext())
    System.out.println(“Thread ” + m_threadNumber + ”: ”
    + new String(it.next().message()));
    System.out.println(“Shutting down Thread: ” + m_threadNumber);
    }
    }

    Partitioner类示例

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;

    public class PartitionerDemo implements Partitioner {
    public PartitionerDemo(VerifiableProperties props) {

    }

    @Override
    public int partition(Object obj, int numPartitions) {
    int partition = 0;
    if (obj instanceof String) {
    String key=(String)obj;
    int offset = key.lastIndexOf(‘.’);
    if (offset > 0) {
    partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
    }
    }else{
    partition = obj.toString().length() % numPartitions;
    }

    return partition;
    }

    }

    参考

    https://cwiki.apache.org/confluence/display/KAFKA/Index

    https://kafka.apache.org/

    来自:开源中国

    上一篇返回首页 下一篇

    声明: 此文观点不代表本站立场;转载务必保留本文链接;版权疑问请联系我们。

    别人在看

    电脑屏幕不小心竖起来了?别慌,快捷键搞定

    Destoon 模板存放规则及语法参考

    Destoon系统常量与变量

    Destoon系统目录文件结构说明

    Destoon 系统安装指南

    Destoon会员公司主页模板风格添加方法

    Destoon 二次开发入门

    Microsoft 将于 2026 年 10 月终止对 Windows 11 SE 的支持

    Windows 11 存储感知如何设置?了解Windows 11 存储感知开启的好处

    Windows 11 24H2 更新灾难:系统升级了,SSD固态盘不见了...

    IT头条

    Synology 更新 ActiveProtect Manager 1.1 以增强企业网络弹性和合规性

    00:43

    新的 Rubrik Agent Cloud 加速了可信的企业 AI 代理部署

    00:34

    宇树科技 G1人形机器人,拉动一辆重达1.4吨的汽车

    00:21

    Cloudera 调查发现,96% 的企业已将 AI 集成到核心业务流程中,这表明 AI 已从竞争优势转变为强制性实践

    02:05

    投资者反对马斯克 1 万亿美元薪酬方案,要求重组特斯拉董事会

    01:18

    技术热点

    大型网站的 HTTPS 实践(三):基于协议和配置的优化

    ubuntu下右键菜单添加新建word、excel文档等快捷方式

    Sublime Text 简明教程

    用户定义SQL Server函数的描述

    怎么在windows 7开始菜单中添加下载选项?

    SQL Server 2016将有哪些功能改进?

      友情链接:
    • IT采购网
    • 科技号
    • 中国存储网
    • 存储网
    • 半导体联盟
    • 医疗软件网
    • 软件中国
    • ITbrand
    • 采购中国
    • CIO智库
    • 考研题库
    • 法务网
    • AI工具网
    • 电子芯片网
    • 安全库
    • 隐私保护
    • 版权申明
    • 联系我们
    IT技术网 版权所有 © 2020-2025,京ICP备14047533号-20,Power by OK设计网

    在上方输入关键词后,回车键 开始搜索。Esc键 取消该搜索窗口。