网站镜像上传到域名空间,网络直接销售的营销方式,企点官网下载,怎么做网站广告古董目录
1.TopicCommand 1.1.Topic创建1.2.删除Topic1.3.Topic分区扩容1.4.查询Topic描述5.查询Topic列表 2.ConfigCommand 2.1 查询配置2.2 增删改 配置 --alter 3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions #xff08;3.1#xff09;脚本的使用介绍 4.Topi…目录1.TopicCommand1.1.Topic创建1.2.删除Topic1.3.Topic分区扩容1.4.查询Topic描述5.查询Topic列表2.ConfigCommand2.1 查询配置2.2 增删改 配置 --alter3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions3.1脚本的使用介绍4.Topic的发送kafka-console-producer.sh5. Topic的消费kafka-console-consumer.sh6.kafka-leader-election Leader重新选举7. 持续批量推送消息kafka-verifiable-producer.sh8. 持续批量拉取消息kafka-verifiable-consumer9.生产者压力测试kafka-producer-perf-test.sh10.消费者压力测试kafka-consumer-perf-test.sh11.删除指定分区的消息kafka-delete-records.sh12. 查看Broker磁盘信息12. 消费者组管理 kafka-consumer-groups.sh1. 查看消费者列表–list2. 查看消费者组详情–describe3. 删除消费者组–delete4. 重置消费组的偏移量 --reset-offsets5. 删除偏移量delete-offsets附件1.TopicCommand1.1.Topic创建bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test相关可选参数参数描述例子--bootstrap-server指定kafka服务指定连接到的kafka服务; 如果有这个参数,则--zookeeper可以不需要–bootstrap-server localhost:9092--zookeeper弃用, 通过zk的连接方式连接到kafka集群;–zookeeper localhost:2181 或者localhost:2181/kafka--replication-factor副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置–replication-factor 3--partitions分区数量,当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题–partitions 3--replica-assignment副本分区分配方式;创建topic的时候可以自己指定副本分配情况;--replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本--configString: namevalue用来设置topic级别的配置以覆盖默认配置;只在–create 和–bootstrap-server 同时使用时候生效; 可以配置的参数列表请看文末附件例如覆盖两个配置--config retention.bytes123455 --config retention.ms600001--command-configString: command 文件路径用来配置客户端Admin Client启动配置,只在–bootstrap-server 同时使用时候生效;例如:设置请求的超时时间--command-config config/producer.proterties; 然后在文件中配置 request.timeout.ms3000001.2.删除Topicbin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来例如: 删除以create_topic_byhand_zk为开头的topic;bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “create_topic_byhand_zk.*”.表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . 请使用 . 。·*·匹配前面的子表达式零次或多次。要匹配 * 字符请使用 *。.*: 任意字符删除任意Topic (慎用)bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “.*?”更多的用法请参考正则表达式1.3.Topic分区扩容zk方式(不推荐)bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2kafka版本 2.2 支持下面方式推荐单个Topic扩容bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4批量扩容(将所有正则表达式匹配到的Topic分区扩容到4个)sh bin/kafka-topics.sh --topic .*? --bootstrap-server 172.23.248.85:9092 --alter --partitions 4.*?正则表达式的意思是匹配所有; 您可按需匹配PS:当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;相关可选参数 | 参数 |描述 |例子| |–|–|–| |--replica-assignment|副本分区分配方式;创建topic的时候可以自己指定副本分配情况; |--replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本|PS: 虽然这里配置的是全部的分区副本分配配置,但是正在生效的是新增的分区;比如: 以前3分区1副本是这样的Broker-1Broker-2Broker-3Broker-4012现在新增一个分区,--replica-assignment2,1,3,4 ; 看这个意思好像是把01号分区互相换个BrokerBroker-1Broker-2Broker-3Broker-41023但是实际上不会这样做,Controller在处理的时候会把前面3个截掉; 只取新增的分区分配方式,原来的还是不会变Broker-1Broker-2Broker-3Broker-401231.4.查询Topic描述1.查询单个Topicsh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal2.批量查询Topic(正则表达式匹配,下面是查询所有Topic)sh bin/kafka-topics.sh --topic .*? --bootstrap-server xxxx:9092 --describe --exclude-internal支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来相关可选参数参数描述例子--bootstrap-server指定kafka服务指定连接到的kafka服务; 如果有这个参数,则--zookeeper可以不需要–bootstrap-server localhost:9092--at-min-isr-partitions查询的时候省略一些计数和配置信息--at-min-isr-partitions--exclude-internal排除kafka内部topic,比如__consumer_offsets-*--exclude-internal--topics-with-overrides仅显示已覆盖配置的主题,也就是单独针对Topic设置的配置覆盖默认配置不展示分区信息--topics-with-overrides5.查询Topic列表1.查询所有Topic列表sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal2.查询匹配Topic列表(正则表达式)查询test_create_开头的所有Topic列表sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic test_create_.*相关可选参数参数描述例子--exclude-internal排除kafka内部topic,比如__consumer_offsets-*--exclude-internal--topic可以正则表达式进行匹配,展示topic名称--topic回到顶部2.ConfigCommandConfig相关操作; 动态配置可以覆盖默认的静态配置;2.1 查询配置Topic配置查询展示关于Topic的动静态配置1.查询单个Topic配置(只列举动态配置)sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic或者sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic2.查询所有Topic配置(包括内部Topic)(只列举动态配置)sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics3.查询Topic的详细配置(动态静态)只需要加上一个参数--all其他配置/clients/users/brokers/broker-loggers 的查询同理 只需要将--entity-type改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)查询kafka版本信息sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version所有可配置的动态配置 请看最后面的 *附件* 部分2.2 增删改 配置--alter–alter删除配置:--delete-configk1v1,k2v2添加/修改配置:--add-configk1,k2选择类型:--entity-type(topics/clients/users/brokers/broker- loggers)类型名称:--entity-nameTopic添加/修改动态配置--add-configsh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms222222,retention.ms999999Topic删除动态配置--delete-configsh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms其他配置同理,只需要类型改下--entity-type类型有: (topics/clients/users/brokers/broker- loggers)哪些配置可以修改 请看最后面的附件ConfigCommand 的一些可选配置回到顶部3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions请戳 【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移 (如果点不出来,表示文章暂未发表,请耐心等待)【kafka运维】你真的懂数据迁移吗3.1脚本的使用介绍关键参数--generate1》构造文件cd /data/kafkavim move.json{ topics: \[ {topic: test} \], version: 1 }运行 generate 参数生成 当前副本的配置介绍 json以及建议修改的 jsonkafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ./move.json --broker-list 0,1,2 --generate Current partition replica assignment {version:1,partitions:\[{topic:test,partition:0,replicas:\[1,0,2\],log\_dirs:\[any,any,any\]},{topic:test,partition:1,replicas:\[0,2,1\],log\_dirs:\[any,any,any\]},{topic:test,partition:2,replicas:\[2,1,0\],log\_dirs:\[any,any,any\]}\]} Proposed partition reassignment configuration {version:1,partitions:\[{topic:test,partition:0,replicas:\[1,0,2\],log\_dirs:\[any,any,any\]},{topic:test,partition:1,replicas:\[2,1,0\],log\_dirs:\[any,any,any\]},{topic:test,partition:2,replicas:\[0,2,1\],log\_dirs:\[any,any,any\]}\]}我找json 在线格式化看看JSON在线 | JSON解析格式化—SO JSON在线工具对比一下发现partition 1 和 2 的replicas 不一样了回到顶部4.Topic的发送kafka-console-producer.sh4.1 生产无key消息## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties4.2 生产有key消息加上属性--property parse.keytrue## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.keytrue默认消息key与消息value间使用“Tab键”进行分隔所以消息key以及value中切勿使用转义字符(\t)可选参数参数值类型说明有效值–bootstrap-serverString要连接的服务器必需(除非指定–broker-list)如host1:prot1,host2:prot2–topicString(必需)接收消息的主题名称–batch-sizeInteger单个批处理中发送的消息数200(默认值)–compression-codecString压缩编解码器none、gzip(默认值)snappy、lz4、zstd–max-block-msLong在发送请求期间生产者将阻止的最长时间60000(默认值)–max-memory-bytesLong生产者用来缓冲等待发送到服务器的总内存33554432(默认值)–max-partition-memory-bytesLong为分区分配的缓冲区大小16384–message-send-max-retriesInteger最大的重试发送次数3–metadata-expiry-msLong强制更新元数据的时间阈值(ms)300000–producer-propertyString将自定义属性传递给生成器的机制如keyvalue–producer.configString生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径–propertyString自定义消息读取器parse.keytrue/false key.separatorkey.separatorignore.errortrue/false–request-required-acksString生产者请求的确认方式0、1(默认值)、all–request-timeout-msInteger生产者请求的确认超时时间1500(默认值)–retry-backoff-msInteger生产者重试前刷新元数据的等待时间阈值100(默认值)–socket-buffer-sizeIntegerTCP接收缓冲大小102400(默认值)–timeoutInteger消息排队异步等待处理的时间阈值1000(默认值)–sync同步发送消息–version显示 Kafka 版本不配合其他参数时显示为本地Kafka版本–help打印帮助信息回到顶部5. Topic的消费kafka-console-consumer.sh1. 新客户端从头消费--from-beginning(注意这里是新客户端,如果之前已经消费过了是不会从头消费的)下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning2. 正则表达式匹配topic进行消费--whitelist消费所有的topicsh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’消费所有的topic并且还从头消费sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning3.显示key进行消费--property print.keytruesh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.keytrue4. 指定分区消费--partition指定起始偏移量消费--offsetsh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 1005. 给客户端命名--group注意给客户端命名之后,如果之前有过消费那么--from-beginning就不会再从头消费了sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group6. 添加客户端属性--consumer-property这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group那肯定不行sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test--consumer-property group.idtest-consumer-group7. 添加客户端属性--consumer.config跟--consumer-property一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面,--consumer-property的优先级大于--consumer.configsh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties参数描述例子--group指定消费者所属组的ID--topic被消费的topic--partition指定分区 除非指定–offset否则从分区结束(latest)开始消费--partition 0--offset执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量--offset10--whitelist正则表达式匹配topic--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition--offset等就不能使用了--consumer-property将用户定义的属性以keyvalue的形式传递给使用者--consumer-propertygroup.idtest-consumer-group--consumer.config消费者配置属性文件请注意[consumer-property]优先于此配置--consumer.configconfig/consumer.properties--property初始化消息格式化程序的属性print.timestamptrue,false 、print.keytrue,false 、print.valuetrue,false 、key.separatorkey.separator 、line.separatorline.separator、key.deserializerkey.deserializer、value.deserializervalue.deserializer--from-beginning从存在的最早消息开始而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过那就不会从头消费了--max-messages消费的最大数据量若不指定则持续消费下去--max-messages100--skip-message-on-error如果处理消息时出错请跳过它而不是暂停--isolation-level设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted--formatterkafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter回到顶部6.kafka-leader-election Leader重新选举6.1 指定Topic指定分区用重新PREFERRED优先副本策略进行Leader重选举 sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 06.2 所有Topic所有分区用重新PREFERRED优先副本策略进行Leader重选举sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions6.3 设置配置文件批量指定topic和分区进行Leader重选举先配置leader-election.json文件{ partitions: [ { topic: test_create_topic4, partition: 1 }, { topic: test_create_topic4, partition: 2 } ] }sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json相关可选参数参数描述例子--bootstrap-server指定kafka服务指定连接到的kafka服务–bootstrap-server localhost:9092--topic指定Topic此参数跟--all-topic-partitions和path-to-json-file三者互斥--partition指定分区,跟--topic搭配使用--election-type两个选举策略(PREFERRED:优先副本选举,如果第一个副本不在线的话会失败;UNCLEAN: 策略)--all-topic-partitions所有topic所有分区执行Leader重选举; 此参数跟--topic和path-to-json-file三者互斥--path-to-json-file配置文件批量选举此参数跟--topic和all-topic-partitions三者互斥回到顶部7. 持续批量推送消息kafka-verifiable-producer.sh单次发送100条消息--max-messages 100一共要推送多少条默认为-1-1表示一直推送到进程关闭位置sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092--max-messages 100每秒发送最大吞吐量不超过消息--throughput 100推送消息时的吞吐量单位messages/sec。默认为-1表示没有限制sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092--throughput 100发送的消息体带前缀--value-prefixsh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092--value-prefix 666注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号.例如666.其他参数--producer.config CONFIG_FILE指定producer的配置文件--acks ACKS每次推送消息的ack值默认是-1回到顶部8. 持续批量拉取消息kafka-verifiable-consumer持续消费sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4单次最大消费10条消息--max-messages 10sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4--max-messages 10相关可选参数参数描述例子--bootstrap-server指定kafka服务指定连接到的kafka服务;–bootstrap-server localhost:9092--topic指定消费的topic--group-id消费者id不指定的话每次都是新的组idgroup-instance-id消费组实例ID,唯一值--max-messages单次最大消费的消息数量--enable-autocommit是否开启offset自动提交默认为false--reset-policy当以前没有消费记录时选择要拉取offset的策略可以是earliest,latest,none。默认是earliest--assignment-strategyconsumer分配分区策略默认是org.apache.kafka.clients.consumer.RangeAssignor--consumer.config指定consumer的配置文件回到顶部9.生产者压力测试kafka-producer-perf-test.sh1. 发送1024条消息--num-records 100并且每条消息大小为1KB--record-size 1024最大吞吐量每秒10000条--throughput 100sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.serverslocalhost:9092 --record-size 1024你可以通过LogIKM查看分区是否增加了对应的数据大小从LogIKM 可以看到发送了1024条消息; 并且总数据量1M; 1024条*1024byte 1M;2. 用指定消息文件--payload-file发送100条消息最大吞吐量每秒100条--throughput 100先配置好消息文件batchmessage.txt然后执行命令 发送的消息会从batchmessage.txt里面随机选择; 注意这里我们没有用参数--payload-delimeter指定分隔符默认分隔符是\n换行;bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.serverslocalhost:9090 --payload-file config/batchmessage.txt验证消息可以通过 LogIKM 查看发送的消息相关可选参数参数描述例子--topic指定消费的topic--num-records发送多少条消息--throughput每秒消息最大吞吐量--producer-props生产者配置, k1v1,k2v2--producer-propsbootstrap.servers localhost:9092,client.idtest_client--producer.config生产者配置文件--producer.configconfig/producer.propeties--print-metrics在test结束的时候打印监控信息,默认false--print-metricstrue--transactional-id指定事务 ID测试并发事务的性能时需要只有在 --transaction-duration-ms 0 时生效默认值为 performance-producer-default-transactional-id--transaction-duration-ms指定事务持续的最长时间超过这段时间后就会调用 commitTransaction 来提交事务只有指定了 0 的值才会开启事务默认值为 0--record-size一条消息的大小byte; 和 --payload-file 两个中必须指定一个但不能同时指定--payload-file指定消息的来源文件只支持 UTF-8 编码的文本文件文件的消息分隔符通过--payload-delimeter指定,默认是用换行\nl来分割的和 --record-size 两个中必须指定一个但不能同时指定 ; 如果提供的消息--payload-delimeter如果通过--payload-file指定了从文件中获取消息内容那么这个参数的意义是指定文件的消息分隔符默认值为 \n即文件的每一行视为一条消息如果未指定--payload-file则此参数不生效发送消息的时候是随机送文件里面选择消息发送的;回到顶部10.消费者压力测试kafka-consumer-perf-test.sh消费100条消息--messages 100sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100相关可选参数参数描述例子--bootstrap-server--consumer.config消费者配置文件--date-format结果打印出来的时间格式化默认yyyy-MM-dd HH:mm:ss:SSS--fetch-size单次请求获取数据的大小默认1048576--topic指定消费的topic--from-latest--group消费组ID--hide-header如果设置了,则不打印header信息--messages需要消费的数量--num-fetch-threadsfeth 数据的线程数默认1--print-metrics结束的时候打印监控数据--show-detailed-stats--threads消费线程数;默认 10回到顶部11.删除指定分区的消息kafka-delete-records.sh删除指定topic的某个分区的消息删除至offset为1024先配置json文件offset-json-file.json{partitions: [{topic: test1, partition: 0, offset: 1024}], version:1 }在执行命令sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json验证 通过 LogIKM 查看发送的消息从这里可以看出来,配置offset: 1024的意思是从最开始的地方删除消息到 1024的offset; 是从最前面开始删除的回到顶部12. 查看Broker磁盘信息查询指定topic磁盘信息--topic-list topic1,topic2sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2查询指定Broker磁盘信息--broker-list 0 broker1,broker2sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0例如我一个3分区3副本的Topic的查出来的信息logDirBroker中配置的log.dir{ version: 1, brokers: [{ broker: 0, logDirs: [{ logDir: /Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0, error: null, partitions: [{ partition: test2-1, size: 0, offsetLag: 0, isFuture: false }, { partition: test2-0, size: 0, offsetLag: 0, isFuture: false }, { partition: test2-2, size: 0, offsetLag: 0, isFuture: false }] }] }, { broker: 1, logDirs: [{ logDir: /Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1, error: null, partitions: [{ partition: test2-1, size: 0, offsetLag: 0, isFuture: false }, { partition: test2-0, size: 0, offsetLag: 0, isFuture: false }, { partition: test2-2, size: 0, offsetLag: 0, isFuture: false }] }] }, { broker: 2, logDirs: [{ logDir: /Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2, error: null, partitions: [{ partition: test2-1, size: 0, offsetLag: 0, isFuture: false }, { partition: test2-0, size: 0, offsetLag: 0, isFuture: false }, { partition: test2-2, size: 0, offsetLag: 0, isFuture: false }] }] }, { broker: 3, logDirs: [{ logDir: /Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3, error: null, partitions: [] }] }] }折叠如果你觉得通过命令查询磁盘信息比较麻烦你也可以通过 LogIKM 查看回到顶部12. 消费者组管理 kafka-consumer-groups.sh1. 查看消费者列表--listsh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list先调用MetadataRequest拿到所有在线Broker列表 再给每个Broker发送ListGroupsRequest请求获取 消费者组数据2. 查看消费者组详情--describeDescribeGroupsRequest查看消费组详情--group或--all-groups查看指定消费组详情--groupsh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group查看所有消费组详情--all-groupssh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息查询消费者成员信息--members所有消费组成员信息sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090指定消费组成员信息sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090查询消费者状态信息--state所有消费组状态信息sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090指定消费组状态信息sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:90903. 删除消费者组--deleteDeleteGroupsRequest删除消费组–delete删除指定消费组--groupsh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090删除所有消费组--all-groupssh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常Error: Deletion of some consumer groups failed: * Group test2_consumer_group could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.4. 重置消费组的偏移量--reset-offsets能够执行成功的一个前提是 消费组这会是不可用状态;下面的示例使用的参数是:--dry-run;这个参数表示预执行,会打印出来将要处理的结果; 等你想真正执行的时候请换成参数--excute;下面示例 重置模式都是--to-earliest重置到最早的;请根据需要参考下面相关重置Offset的模式换成其他模式;重置指定消费组的偏移量--group重置指定消费组的所有Topic的偏移量--all-topicsh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic重置指定消费组的指定Topic的偏移量--topicsh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2重置所有消费组的偏移量--all-group重置所有消费组的所有Topic的偏移量--all-topicsh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic重置所有消费组中指定Topic的偏移量--topicsh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2--reset-offsets后面需要接重置的模式相关重置Offset的模式参数描述例子--to-earliest:重置offset到最开始的那条offset(找到还未被删除最早的那个offset)--to-current:直接重置offset到当前的offset也就是LOE--to-latest重置到最后一个offset--to-datetime:重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss;--to-datetime 2021-6-26T00:00:00.000--to-offset重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset--to-offset, 这个时候重置为目标最大offset2.目标最小offset--to-offset则重置为最小; 3.否则的话才会重置为--to-offset的目标值;一般不用这个--to-offset 3465||--shift-by| 按照偏移量增加或者减少多少个offset正的为往前增加;负的往后退当然这里也是匹配所有的; |--shift-by 100、--shift-by -100||--from-file| 根据CVS文档来重置; 这里下面单独讲解 | |--from-file着重讲解一下上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset不过**--from-file**可以让我们更灵活一点;先配置cvs文档 格式为: Topic:分区号: 重置目标偏移量test2,0,100 test2,1,200 test2,2,300执行命令sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv5. 删除偏移量delete-offsets能够执行成功的一个前提是 消费组这会是不可用状态;偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2相关可选参数参数描述例子--bootstrap-server指定连接到的kafka服务;–bootstrap-server localhost:9092--list列出所有消费组名称--list--describe查询消费者描述信息--describe--group指定消费组--all-groups指定所有消费组--members查询消费组的成员信息--state查询消费者的状态信息--offsets在查询消费组描述信息的时候,这个参数会列出消息的偏移量信息; 默认就会有这个参数的;dry-run重置偏移量的时候,使用这个参数可以让你预先看到重置情况这个时候还没有真正的执行,真正执行换成--excute;默认为dry-run--excute真正的执行重置偏移量的操作;--to-earliest将offset重置到最早to-latest将offset重置到最近回到顶部附件ConfigCommand 的一些可选配置Topic相关可选配置keyvalue示例cleanup.policy清理策略compression.type压缩类型(通常建议在produce端控制)delete.retention.ms压缩日志的保留时间file.delete.delay.msflush.messages持久化message限制flush.ms持久化频率follower.replication.throttled.replicasflowwer副本限流 格式分区号:副本follower号,分区号:副本follower号0:1,1:1index.interval.bytesleader.replication.throttled.replicasleader副本限流 格式分区号:副本Leader号0:0max.compaction.lag.msmax.message.bytes最大的batch的message大小message.downconversion.enablemessage是否向下兼容message.format.versionmessage格式版本message.timestamp.difference.max.msmessage.timestamp.typemin.cleanable.dirty.ratiomin.compaction.lag.msmin.insync.replicas最小的ISRpreallocateretention.bytes日志保留大小(通常按照时间限制)retention.ms日志保留时间segment.bytessegment的大小限制segment.index.bytessegment.jitter.mssegment.mssegment的切割时间unclean.leader.election.enable是否允许非同步副本选主Broker相关可选配置keyvalue示例advertised.listenersbackground.threadscompression.typefollower.replication.throttled.rateleader.replication.throttled.ratelistener.security.protocol.maplistenerslog.cleaner.backoff.mslog.cleaner.dedupe.buffer.sizelog.cleaner.delete.retention.mslog.cleaner.io.buffer.load.factorlog.cleaner.io.buffer.sizelog.cleaner.io.max.bytes.per.secondlog.cleaner.max.compaction.lag.mslog.cleaner.min.cleanable.ratiolog.cleaner.min.compaction.lag.mslog.cleaner.threadslog.cleanup.policylog.flush.interval.messageslog.flush.interval.mslog.index.interval.byteslog.index.size.max.byteslog.message.downconversion.enablelog.message.timestamp.difference.max.mslog.message.timestamp.typelog.preallocatelog.retention.byteslog.retention.mslog.roll.jitter.mslog.roll.mslog.segment.byteslog.segment.delete.delay.msmax.connectionsmax.connections.per.ipmax.connections.per.ip.overridesmessage.max.bytesmetric.reportersmin.insync.replicasnum.io.threadsnum.network.threadsnum.recovery.threads.per.data.dirnum.replica.fetchersprincipal.builder.classreplica.alter.log.dirs.io.max.bytes.per.secondsasl.enabled.mechanismssasl.jaas.configsasl.kerberos.kinit.cmdsasl.kerberos.min.time.before.reloginsasl.kerberos.principal.to.local.rulessasl.kerberos.service.namesasl.kerberos.ticket.renew.jittersasl.kerberos.ticket.renew.window.factorsasl.login.refresh.buffer.secondssasl.login.refresh.min.period.secondssasl.login.refresh.window.factorsasl.login.refresh.window.jittersasl.mechanism.inter.broker.protocolssl.cipher.suitesssl.client.authssl.enabled.protocolsssl.endpoint.identification.algorithmssl.key.passwordssl.keymanager.algorithmssl.keystore.locationssl.keystore.passwordssl.keystore.typessl.protocolssl.providerssl.secure.random.implementationssl.trustmanager.algorithmssl.truststore.locationssl.truststore.passwordssl.truststore.typeunclean.leader.election.enableUsers相关可选配置keyvalue示例SCRAM-SHA-256SCRAM-SHA-512consumer_byte_rate针对消费者user进行限流producer_byte_rate针对生产者进行限流request_percentage请求百分比clients相关可选配置keyvalue示例consumer_byte_rateproducer_byte_raterequest_percentage这两年IT行业面临经济周期波动与AI产业结构调整的双重压力确实有很多运维与网络工程师因企业缩编或技术迭代而暂时失业。很多人都在提运维网工失业后就只能去跑滴滴送外卖了但我想分享的是对于运维人员来说即便失业以后仍然有很多副业可以尝试。运维副业方向运维千万不要再错过这些副业机会第一个是知识付费类副业输出经验打造个人IP在线教育平台讲师操作路径在慕课网、极客时间等平台开设《CCNA实战》《Linux运维从入门到精通》等课程或与培训机构合作录制专题课。收益模式课程销售分成、企业内训。技术博客与公众号运营操作路径撰写网络协议解析、故障排查案例、设备评测等深度文章通过公众号广告、付费专栏及企业合作变现。收益关键每周更新2-3篇原创结合SEO优化与社群运营。第二个是技术类副业深耕专业领域变现企业网络设备配置与优化服务操作路径为中小型企业提供路由器、交换机、防火墙等设备的配置调试、性能优化及故障排查服务。可通过本地IT服务公司合作或自建线上接单平台获客。收益模式按项目收费或签订年度维护合同。远程IT基础设施代维操作路径通过承接服务器监控、日志分析、备份恢复等远程代维任务。适合熟悉Zabbix、ELK等技术栈的工程师。收益模式按工时计费或包月服务。网络安全顾问与渗透测试操作路径利用OWASP Top 10漏洞分析、Nmap/BurpSuite等工具为企业提供漏洞扫描、渗透测试及安全加固方案。需考取CISP等认证提升资质。收益模式单次渗透测试报告收费长期安全顾问年费。比如不久前跟我一起聊天的一个粉丝他自己之前是大四实习的时候做的运维发现运维7*24小时待命受不了就准备转网安学了差不多2个月然后开始挖漏洞光是补天的漏洞奖励也有个四五千他说自己每个月的房租和饭钱就够了。为什么我会推荐你网安是运维人员的绝佳副业转型方向?1.你的经验是巨大优势:你比任何人都懂系统、网络和架构。漏洞挖掘、内网渗透、应急响应这些核心安全能力本质上是“攻击视角下的运维”。你的运维背景不是从零开始而是降维打击。2.越老越吃香规避年龄危机:安全行业极度依赖经验。你的排查思路、风险意识和对复杂系统的理解能力会随着项目积累而愈发珍贵真正做到“姜还是老的辣”。3.职业选择极其灵活:你可以加入企业成为安全专家可以兼职“挖洞“获取丰厚奖金甚至可以成为自由顾问。这种多样性为你提供了前所未有的抗风险能力。4.市场需求爆发前景广阔:在国家级政策的推动下从一线城市到二三线地区安全人才缺口正在急剧扩大。现在布局正是抢占未来先机的黄金时刻。运维转行学习路线一第一阶段网络安全筑基1. 阶段目标你已经有运维经验了所以操作系统、网络协议这些你不是零基础。但要学安全得重新过一遍——只不过这次我们是带着“安全视角”去学。2. 学习内容**操作系统强化**你需要重点学习 Windows、Linux 操作系统安全配置对比运维工作中常规配置与安全配置的差异深化系统安全认知比如说日志审计配置为应急响应日志分析打基础。**网络协议深化**结合过往网络协议应用经验聚焦 TCP/IP 协议簇中的安全漏洞及防护机制如 ARP 欺骗、TCP 三次握手漏洞等为 SRC 漏扫中协议层漏洞识别铺垫。**Web 与数据库基础**补充 Web 架构、HTTP 协议及 MySQL、SQL Server 等数据库安全相关知识了解 Web 应用与数据库在网安中的作用。**编程语言入门**学习 Python 基础语法掌握简单脚本编写为后续 SRC 漏扫自动化脚本开发及应急响应工具使用打基础。**工具实战**集中训练抓包工具Wireshark、渗透测试工具Nmap、漏洞扫描工具Nessus 基础版的使用结合模拟场景练习工具应用掌握基础扫描逻辑为 SRC 漏扫工具进阶做准备。二第二阶段漏洞挖掘与 SRC 漏扫实战1. 阶段目标这阶段是真正开始“动手”了。信息收集、漏洞分析、工具联动一样不能少。熟练运用漏洞挖掘及 SRC 漏扫工具具备独立挖掘常见漏洞及 SRC 平台漏扫实战能力尝试通过 SRC 挖洞搞钱不管是低危漏洞还是高危漏洞先挖到一个。2. 学习内容信息收集实战结合运维中对网络拓扑、设备信息的了解强化基本信息收集、网络空间搜索引擎Shodan、ZoomEye、域名及端口信息收集技巧针对企业级网络场景开展信息收集练习为 SRC 漏扫目标筛选提供支撑。漏洞原理与分析深入学习 SQL 注入、CSRF、文件上传等常见漏洞的原理、危害及利用方法结合运维工作中遇到的类似问题进行关联分析明确 SRC 漏扫重点漏洞类型。工具进阶与 SRC 漏扫应用系统学习 SQLMap、BurpSuite、AWVS 等工具的高级功能开展工具联用实战训练专项学习 SRC 漏扫流程包括 SRC 平台规则解读如漏洞提交规范、奖励机制、漏扫目标范围界定、漏扫策略制定全量扫描 vs 定向扫描、漏扫结果验证与复现实战训练使用 AWVSBurpSuite 组合开展 SRC 平台目标漏扫练习 “扫描 - 验证 - 漏洞报告撰写 - 平台提交” 全流程。SRC 实战演练选择合适的 SRC 平台如补天、CNVD进行漏洞挖掘与漏扫实战积累实战经验尝试获取挖洞收益。恭喜你如果学到这里你基本可以下班搞搞副业创收了并且具备渗透测试工程师必备的「渗透技巧」、「溯源能力」让你在黑客盛行的年代别背锅工作实现升职加薪的同时也能开创副业创收如果你想要入坑黑客网络安全笔者给大家准备了一份全网最全的网络安全资料包需要保存下方图片微信扫码即可前往获取!因篇幅有限仅展示部分资料需要点击下方链接即可前往获取CSDN大礼包《黑客网络安全入门进阶学习资源包》免费分享三第三阶段渗透测试技能学习1. 阶段目标全面掌握渗透测试理论与实战技能能够独立完成渗透测试项目编写规范的渗透测试报告具备渗透测试工程师岗位能力为护网红蓝对抗及应急响应提供技术支撑。2. 学习内容渗透测试核心理论系统学习渗透测试流程、方法论及法律法规知识明确渗透测试边界与规范与红蓝对抗攻击边界要求一致。实战技能训练开展漏洞扫描、漏洞利用、电商系统渗透测试、内网渗透、权限提升Windows、Linux、代码审计等实战训练结合运维中熟悉的系统环境设计测试场景强化红蓝对抗攻击端技术能力。工具开发实践基于 Python 编程基础学习渗透测试工具开发技巧开发简单的自动化测试脚本可拓展用于 SRC 漏扫自动化及应急响应辅助工具。报告编写指导学习渗透测试报告的结构与编写规范完成多个不同场景的渗透测试报告撰写练习与 SRC 漏洞报告、应急响应报告撰写逻辑互通。四第四阶段企业级安全攻防含红蓝对抗、应急响应1. 阶段目标掌握企业级安全攻防、护网红蓝对抗及应急响应核心技能考取网安行业相关证书。2. 学习内容护网红蓝对抗专项红蓝对抗基础学习护网行动背景、红蓝对抗规则攻击范围、禁止行为、红蓝双方角色职责红队模拟攻击蓝队防御检测与应急处置红队实战技能强化内网渗透、横向移动、权限维持、免杀攻击等高级技巧模拟护网中常见攻击场景蓝队实战技能学习安全设备防火墙、IDS/IPS、WAF联动防御配置、安全监控平台SOC使用、攻击行为研判与溯源方法模拟护网演练参与团队式红蓝对抗演练完整体验 “攻击 - 检测 - 防御 - 处置” 全流程。应急响应专项应急响应流程学习应急响应 6 步流程准备 - 检测 - 遏制 - 根除 - 恢复 - 总结掌握各环节核心任务实战技能开展操作系统入侵响应如病毒木马清除、异常进程终止、数据泄露应急处置、漏洞应急修补等实战训练工具应用学习应急响应工具如 Autoruns、Process Monitor、病毒分析工具的使用提升处置效率案例复盘分析真实网络安全事件应急响应案例如勒索病毒事件总结处置经验。其他企业级攻防技能学习社工与钓鱼、CTF 夺旗赛解析等内容结合运维中企业安全防护需求深化理解。证书备考针对网安行业相关证书考试内容含红蓝对抗、应急响应考点进行专项复习参加模拟考试查漏补缺。运维转行网络攻防知识库分享网络安全这行不是会几个工具就能搞定的。你得有体系懂原理能实战。尤其是从运维转过来的别浪费你原来的经验——你比纯新人强多了。但也要沉得住气别学了两天Web安全就觉得自己是黑客了。内网、域渗透、代码审计、应急响应要学的还多着呢。如果你真的想转按这个路子一步步走没问题。如果你只是好奇我劝你再想想——这行要持续学习挺累的但也是真有意思。关于如何学习网络安全笔者也给大家整理好了全套网络安全知识库需要的可以扫码获取因篇幅有限仅展示部分资料需要点击下方链接即可前往获取CSDN大礼包《黑客网络安全入门进阶学习资源包》免费分享1、网络安全意识2、Linux操作系统3、WEB架构基础与HTTP协议4、Web渗透测试5、渗透测试案例分享6、渗透测试实战技巧7、攻防对战实战8、CTF之MISC实战讲解关于如何学习网络安全笔者也给大家整理好了全套网络安全知识库需要的可以扫码获取因篇幅有限仅展示部分资料需要点击下方链接即可前往获取CSDN大礼包《黑客网络安全入门进阶学习资源包》免费分享