您的位置:首页 > 数据 >
视讯!配置完Kafka集群后 通过JavaAPI方式来操作
来源:CSDN 2023-01-28 09:07:19


(资料图片仅供参考)

配置完Kafka集群后,下面通过Java API的方式来操作 需要导入的Jar包

kafka_2.10-0.8.1.1.jar    log4j-1.2.15.jar    metrics-core-2.2.0.jar    scala-library-2.10.1.jar    slf4j-api-1.7.2.jar

以上jar包均可从Kafka的发布包中找的到,在lib目录下面

生产者(Producers)

代码:

import java.util.*; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class TestProducer {    public static void main(String[] args) {        long events = Long.parseLong(args[0]);        Random rnd = new Random();                //在以下属性中定义了Producer如何找到集群,序列化消息等        Properties props = new Properties();        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");        props.put("serializer.class", "kafka.serializer.StringEncoder");        props.put("partitioner.class", "example.producer.SimplePartitioner");        props.put("request.required.acks", "1");        ProducerConfig config = new ProducerConfig(props);                //定义生产者对象,该类指定了两个参数的泛型,第一个参数表示分区键值的类型,第二参数表示消息类型        Producerproducer = new Producer(config);        for (long nEvents = 0; nEvents < events; nEvents++) {                long runtime = new Date().getTime();                 String ip = “192.168.2.” + rnd.nextInt(255);                String msg = runtime + “,www.example.com,” + ip;                //发送消息到消息中介,test指定要接受消息的主题。               KeyedMessagedata = new KeyedMessage("test", ip, msg);               //执行发送               producer.send(data);        }        producer.close();    }}

Producer配置参数:

metadata.broker.list:定义一个或者多个消息中介(broker),Produder通过broker决定主题leader的位置。这里无需配置所有的broker,但建议配置多于一个。  serializer.class:定义准备传递数据给broker时使用哪个序列化器。  partitioner.class:这个是可选项,该类将决定消息将发送到哪个主题分区上。  request.required.acks:该值设置为1后,broker收到消息后将发送一个确认信息给producer。

在上述程序运行之前请确保Kafka已经存在名称为test的主题,如果没有可以使用下面命令创建      bin/kafka-create-topic.sh --topic test --replica 3--zookeeper localhost:2181--partition 5然后使用下面命令查看:      bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic test --from-beginning【参考】: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

关键词:
相关文章
当前滚动:四川铁路电煤专列同比增长9.8%

当前滚动:四川铁路电煤专列同比增长9.8%

  今年春运以来,中国铁路成都局集团有限公司已抢卸电煤3万余车、190万余吨,其中,四川地区已抢卸电煤13万余车、83万余吨,卸车数同比增长9更多

2023-01-28 09:17:28
内蒙古:煤企生产忙 供应不断档

内蒙古:煤企生产忙 供应不断档

  春节假期,位于内蒙古乌海市的国家能源集团乌海能源公司加强煤炭生产组织,确保煤炭供应不断档。乌海能源公司提前制定计划,适时调整煤炭更多

2023-01-28 09:15:32
陕煤入渝发运量再创新高 2022年达1798.3万吨 环球速看料

陕煤入渝发运量再创新高 2022年达1798.3万吨 环

  据陕煤运销集团统计数据显示,2022年陕煤入渝发运量17983万吨,同比增长132%,入渝年度发运量创下历史新纪录,有效保障了重庆地区电力和重更多

2023-01-20 11:10:02
全球快看:为完成2023年煤炭供应目标 晋城市出台了7项措施

全球快看:为完成2023年煤炭供应目标 晋城市出台

  晋城市能源局消息,该市日前发布了煤炭增产保供和产能新增工作方案并提出,通过核增产能、扩产、新投产等方式,加快释放先进产能,充分发更多

2023-01-20 11:09:59
当前关注:2022年内蒙古煤电气产量对全国增长贡献均居首位

当前关注:2022年内蒙古煤电气产量对全国增长贡献

  2022年,内蒙古全区积极落实煤电增产保供工作,规模以上工业煤电气等主要能源产品产量均位居全国前列,对全国增长贡献率均位居首位,有力更多

2023-01-20 11:11:26
全球微速讯:山东:细化分解任务 坚决完成煤炭保供任务

全球微速讯:山东:细化分解任务 坚决完成煤炭保

  山东省政府工作报告提出,要抓好能源保供。菏泽代表团三名来自煤炭企业的省人大代表,围绕报告精神,就如何保障煤炭供应,加强能源保供建更多

2023-01-20 10:54:12
CCTD:短期煤市基本面仍难寻亮点

CCTD:短期煤市基本面仍难寻亮点

  据CCTD了解,节前最后一周,煤炭市场交易逐渐停滞,周初在封航港口库存迅速累积情况下,贸易商报价有小幅下跌。随后封航结束,港口调出恢更多

2023-01-20 11:02:24
每日观察!12月份内蒙古煤炭价格环比下降8.3%

每日观察!12月份内蒙古煤炭价格环比下降8.3%

  2022年12月份受疫情防控、原油价格变化,农业生产进程和建筑工程进展等因素影响,煤炭价格环比下降。据内蒙古自治区商务厅生产资料市场监更多

2023-01-19 10:54:37
促开采、增储备、抢维修、保运输,各地多措并举——力保群众温暖过冬过年 焦点速讯

促开采、增储备、抢维修、保运输,各地多措并举—

  中央经济工作会议要求,要做好岁末年初各项工作,强化市场保供稳价,加强能源调节,确保群众温暖安全过冬。冬季供暖事关千家万户,是重要更多

2023-01-19 10:08:31
生态环境部:推动保供煤矿全部按时保质完成环评办理

生态环境部:推动保供煤矿全部按时保质完成环评办

  生态环境部环境影响评价与排放管理司司长刘志全17日在新闻发布会上表示,2023年将落实环评审批三本台账和绿色通道机制,推进十四五重大工更多

2023-01-19 08:59:13