关闭 x
IT技术网
    技 采 号
    ITJS.cn - 技术改变世界
    • 实用工具
    • 菜鸟教程
    IT采购网 中国存储网 科技号 CIO智库

    IT技术网

    IT采购网
    • 首页
    • 行业资讯
    • 系统运维
      • 操作系统
        • Windows
        • Linux
        • Mac OS
      • 数据库
        • MySQL
        • Oracle
        • SQL Server
      • 网站建设
    • 人工智能
    • 半导体芯片
    • 笔记本电脑
    • 智能手机
    • 智能汽车
    • 编程语言
    IT技术网 - ITJS.CN
    首页 » JAVA »分布式消息系统Kafka Java客户端代码

    分布式消息系统Kafka Java客户端代码

    2014-08-12 00:00:00 出处:ITJS
    分享

    介绍

    kafka是一种高吞吐量的分布式发布订阅消息系统。

    kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)

    当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。

    高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

    测试环境

    kafka_2.10-0.8.1.1 3个节点做的集群

    zookeeper-3.4.5 一个实例节点

    代码示例

    消息生产者代码示例

    import java.util.Collections;
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    /**
    * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
    * @author Fung
    *
    */
    public class ProducerDemo {
    public static void main(String[] args) {
    Random rnd = new Random();
    int events=100;

    // 设置配置属性
    Properties props = new Properties();
    props.put(“metadata.broker.list”,”172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092″);
    props.put(“serializer.class”, ”kafka.serializer.StringEncoder”);
    // key.serializer.class默认为serializer.class
    props.put(“key.serializer.class”, ”kafka.serializer.StringEncoder”);
    // 可选配置,如果不配置,则使用默认的partitioner
    props.put(“partitioner.class”, ”com.catt.kafka.demo.PartitionerDemo”);
    // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
    // 值为0,1,-1,可以参考
    // http://kafka.apache.org/08/configuration.html
    props.put(“request.required.acks”, ”1″);
    ProducerConfig config = new ProducerConfig(props);

    // 创建producer
    Producer<String, String> producer = new Producer<String, String>(config);
    // 产生并发送消息
    long start=System.currentTimeMillis();
    for (long i = 0; i < events; i++) {
    long runtime = new Date().getTime();
    String ip = ”192.168.2.” + i;//rnd.nextInt(255);
    String msg = runtime + ”,www.example.com,” + ip;
    //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
    KeyedMessage<String, String> data = new KeyedMessage<String, String>(
    “page_visits”, ip, msg);
    producer.send(data);
    }
    System.out.println(“耗时:” + (System.currentTimeMillis() - start));
    // 关闭producer
    producer.close();
    }
    }

    消息消费者代码示例

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;

    /**
    * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
    *
    * @author Fung
    *
    */
    public class ConsumerDemo {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;

    public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
    this.topic = a_topic;
    }

    public void shutdown() {
    if (consumer != null)
    consumer.shutdown();
    if (executor != null)
    executor.shutdown();
    }

    public void run(int numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
    .createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    executor = Executors.newFixedThreadPool(numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerMsgTask(stream, threadNumber));
    threadNumber++;
    }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
    String a_groupId) {
    Properties props = new Properties();
    props.put(“zookeeper.connect”, a_zookeeper);
    props.put(“group.id”, a_groupId);
    props.put(“zookeeper.session.timeout.ms”, ”400″);
    props.put(“zookeeper.sync.time.ms”, ”200″);
    props.put(“auto.commit.interval.ms”, ”1000″);

    return new ConsumerConfig(props);
    }

    public static void main(String[] arg) {
    String[] args = { ”172.168.63.221:2188″, ”group-1″, ”page_visits”, ”12″ };
    String zooKeeper = args[0];
    String groupId = args[1];
    String topic = args[2];
    int threads = Integer.parseInt(args[3]);

    ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
    demo.run(threads);

    try {
    Thread.sleep(10000);
    } catch (InterruptedException ie) {

    }
    demo.shutdown();
    }
    }

    消息处理类

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;

    public class ConsumerMsgTask implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
    m_threadNumber = threadNumber;
    m_stream = stream;
    }

    public void run() {
    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext())
    System.out.println(“Thread ” + m_threadNumber + ”: ”
    + new String(it.next().message()));
    System.out.println(“Shutting down Thread: ” + m_threadNumber);
    }
    }

    Partitioner类示例

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;

    public class PartitionerDemo implements Partitioner {
    public PartitionerDemo(VerifiableProperties props) {

    }

    @Override
    public int partition(Object obj, int numPartitions) {
    int partition = 0;
    if (obj instanceof String) {
    String key=(String)obj;
    int offset = key.lastIndexOf(‘.’);
    if (offset > 0) {
    partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
    }
    }else{
    partition = obj.toString().length() % numPartitions;
    }

    return partition;
    }

    }

    参考

    https://cwiki.apache.org/confluence/display/KAFKA/Index

    https://kafka.apache.org/

    来自:开源中国

    上一篇返回首页 下一篇

    声明: 此文观点不代表本站立场;转载务必保留本文链接;版权疑问请联系我们。

    别人在看

    正版 Windows 11产品密钥怎么查找/查看?

    还有3个月,微软将停止 Windows 10 的更新

    Windows 10 终止支持后,企业为何要立即升级?

    Windows 10 将于 2025年10 月终止技术支持,建议迁移到 Windows 11

    Windows 12 发布推迟,微软正全力筹备Windows 11 25H2更新

    Linux 退出 mail的命令是什么

    Linux 提醒 No space left on device,但我的空间看起来还有不少空余呢

    hiberfil.sys文件可以删除吗?了解该文件并手把手教你删除C盘的hiberfil.sys文件

    Window 10和 Windows 11哪个好?答案是:看你自己的需求

    盗版软件成公司里的“隐形炸弹”?老板们的“法务噩梦” 有救了!

    IT头条

    公安部:我国在售汽车搭载的“智驾”系统都不具备“自动驾驶”功能

    02:03

    液冷服务器概念股走强,博汇、润泽等液冷概念股票大涨

    01:17

    亚太地区的 AI 驱动型医疗保健:2025 年及以后的下一步是什么?

    16:30

    智能手机市场风云:iPhone领跑销量榜,华为缺席引争议

    15:43

    大数据算法和“老师傅”经验叠加 智慧化收储粮食尽显“科技范”

    15:17

    技术热点

    商业智能成CIO优先关注点 技术落地方显成效(1)

    用linux安装MySQL时产生问题破解

    JAVA中关于Map的九大问题

    windows 7旗舰版无法使用远程登录如何开启telnet服务

    Android View 事件分发机制详解

    MySQL用户变量的用法

      友情链接:
    • IT采购网
    • 科技号
    • 中国存储网
    • 存储网
    • 半导体联盟
    • 医疗软件网
    • 软件中国
    • ITbrand
    • 采购中国
    • CIO智库
    • 考研题库
    • 法务网
    • AI工具网
    • 电子芯片网
    • 安全库
    • 隐私保护
    • 版权申明
    • 联系我们
    IT技术网 版权所有 © 2020-2025,京ICP备14047533号-20,Power by OK设计网

    在上方输入关键词后,回车键 开始搜索。Esc键 取消该搜索窗口。