博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka与.net core(三)kafka操作
阅读量:4840 次
发布时间:2019-06-11

本文共 5496 字,大约阅读时间需要 18 分钟。

1.Kafka相关知识

  • Broker:即Kafka的服务器,用户存储消息,Kafa集群中的一台或多台服务器统称为broker。
  • Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
    • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
    • partition中的每条Message包含了以下三个属性:Kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。
      • offset:消息唯一标识:对应类型:long
      • MessageSize 对应类型:int32
      • data 是message的具体内容。
    • 越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。
  • Message:在Broker中通Log追加的方式进行持久化存储。并进行分区(patitions)。
    • 一个Topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中。
    • Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。

      

    • Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)。
    • 为实现稀疏存储,我们通过给文件建索引,每隔一定字节的数据建立一条索引

       

  • 为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
  • Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。Message消息是有多份的。
  • consumer:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
    • 在 kafka中,我们可以认为一个group是一个订阅者,一个Topic中的每个partions,只会被一个订阅者中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息(消费者数据小于Partions  的数量时)。注意:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
    • 一个partition中的消息只会被group中的一个consumer消息。每个group中consumer消息消费互相独立。
  • 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
  • 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。

2.kafka操作

2.1.查看有哪些主题:

kafka-topics.sh --list --zookeeper 192.168.0.201:12181

2.2.查看topic的详细信息

kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1

2.3.为topic增加副本

kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute

2.4.创建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1

2.5为topic增加partition

bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1

2.6kafka生产者客户端命令

kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1

2.7kafka消费者客户端命令

kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1

2.8kafka服务启动

kafka-server-start.sh -daemon ../config/server.properties

3..net core操作

producer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;namespace KafkaTest{    class Program    {        static void Main(string[] args)        {            Test().Wait();        }        static async Task Test()        {           var conf = new ProducerConfig { BootstrapServers = "39.**.**.**:9092" };            Action
> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using (var p = new Producer
(conf)) { for (int i = 0; i < 100000; ++i) { p.BeginProduce("my-topic", new Message
{ Value = i.ToString() }, handler); } // wait for up to 10 seconds for any inflight messages to be delivered. p.Flush(TimeSpan.FromSeconds(10)); } } }}

consumer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;using System;using System.Linq;using System.Text;namespace KafkaClient{    class Program    {        static void Main(string[] args)        {                        var conf = new ConsumerConfig            {                GroupId = "test-consumer-group4",                BootstrapServers = "39.**.**.**:9092",                // Note: The AutoOffsetReset property determines the start offset in the event                // there are not yet any committed offsets for the consumer group for the                // topic/partitions of interest. By default, offsets are committed                // automatically, so in this example, consumption will only start from the                // earliest message in the topic 'my-topic' the first time you run the program.                AutoOffsetReset = AutoOffsetResetType.Earliest            };            using (var c = new Consumer
(conf)) { c.Subscribe("my-topic"); bool consuming = true; // The client will automatically recover from non-fatal errors. You typically // don't need to take any action unless an error is marked as fatal. c.OnError += (_, e) => consuming = !e.IsFatal; while (consuming) { try { var cr = c.Consume(); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } } }}

 

转载于:https://www.cnblogs.com/chenyishi/p/10250768.html

你可能感兴趣的文章
运行第一个OpenCV程序
查看>>
算法笔记_003:矩阵相乘问题【分治法】
查看>>
算法笔记_017:递归执行顺序的探讨(Java)
查看>>
牛顿法与拟牛顿法学习笔记(四)BFGS 算法
查看>>
ninth week (1)
查看>>
C float与char数组 互转
查看>>
异步线程中开启定时器
查看>>
正则表达式与unicode
查看>>
abp(net core)+easyui+efcore实现仓储管理系统——ABP总体介绍(一)
查看>>
div水平居中与垂直居中的方法【摘自美浩工作室官方博客 】
查看>>
UITableView 滚动条
查看>>
Android已有的原生Camera框架中加入自己的API的实现方案。
查看>>
Learn python the ninth day
查看>>
Debian+Django+uWsgi+nginx+mysql+celery
查看>>
docker 基本操作
查看>>
无缝滚动的float属性
查看>>
价值观作业
查看>>
char , unsigned char 和 signed char 区别
查看>>
挂起布局逻辑与恢复布局逻辑
查看>>
back to back
查看>>