简介

我的技术笔记:Inspired by lvt4j and reference from.

本博客使用mdbook+github page进行搭建,相关语法可参考 mdbook官方指南

笔记已经推送到 个人github仓库

学习方向

基础学习: 通读官方文档,熟悉API.

拔高学习: 源码核心设计,接口,机制,流程,抽象等.

技术关键字

Kafka: Streaming Processing Platform

Akka:Reactive Architecture

SMACK: Spark/Mesos/Akka/Cassandra/Kafka

学习进度

技术进度
Akkahttps://doc.akka.io/docs/akka/current/mailboxes.html
Eshttps://www.elastic.co/guide/cn/elasticsearch/guide/current/full-text-search.html
Kafkahttps://developer.confluent.io/patterns/
Springhttps://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-factory-autowire
Redishttps://redis.io/docs/getting-started/
Mysql尚未开始

技术栈

后端前端程度
Spring-BootSpring-Boot-Admin
Spring-CloudSpring Cloud ConsulSpring Cloud ConfigSpring Cloud GatewaySpring Cloud OpenFeign
MybatisMyBatisPlus Mybatis-spring guava(google的java编码工具包) lombok(简化java代码) lvt4j-rbac(可视化rbac模型权限控制服务) logstash-logback-encoderGradle ,Velocity(Java模板引擎)
html/js/css jQuery Bootstrap ace(常见的后台系统开发控件集) ace组件代码 jstpl(一个前端模板渲染引擎) lvt4js(js工具总结)必备
Redis ElasticSearch kafkajQuery-ui jQuery-ui-datetimepicker select2 uploadify(基于H5的文件上传插件) math(js精度处理) jquery.tagsinput-revisited.js精通
atomikos(分布式数据库事务集成管理) javamail jave(java集成FFmpeg的视频工具) mapdb(基于磁盘的嵌入式java集合对象存储) httpcomponents(http请求工具) dom4j(java的xml解析) poi(java版office文档生成处理) canal(阿里的数据库监听) akka(分布式异步编程框架) hazelcast(分布式内存数据网格) seata(阿里分布式事务框架)Consul(HashiCorp的网络工具,提供服务发现和服务网格)
单元测试:junit4/junit5assertjmockito
highcharts(图形图表) fontawesome(矢量图形字体库)了解

Coding

admin

common-design

需要给业务层提供通用类库,该以怎么样的方式提供?通常有以下几种方式:

1.类似于apache common这种模式,应用层调用库。

2.类似于Spring boot stater+auto-configuration这种模式。

3.类似于IOC/框架模式/模板(XXXTemplate)/策略(回调)模式,框架调用应用层代码。典型的如JDBCTemplate.

4.类似于普通Spring模式,提供库,并且暴露配置且提供默认配置,由应用层自定义设置。

common-elasticsearch

Kafka学习笔记

学习笔记

1. kafka前瞻—架构维度

2. kafka安装以及常用的命令

3. 基础开发及消费者提交维护offset不同粒度

4. ISR,OSR,AR,LW,HW,LEO,ACK原理理论

书籍

1. chatGPT写的一篇《Kafka权威指南》总结

2. 《Kafka权威指南》笔记

地址

git项目地址

Kafka学习笔记

1. kafka前瞻—架构维度

kafka是什么?分布式消息中间件,分布式流式消息(事件)处理平台

服务——>服务

网络到分布式

  1. 单点问题 2. 性能问题

分布式:一致性,可靠性,可扩展性

AKF

x-高可用 横向扩展,水平复制(没解决的问题:一发动全身)

y-业务功能划分 功能分解扩展,比如微服务(好的业务迟早会遇见数据瓶颈)

z-数据分区,比如分库分表

数据处理

大数据

无关的数据 必然分治——>无关的数据就分散到不同的分区里,以追求并发并行

有关的数据 聚合——> 有关的数据,一定要按原有顺序发送到同一分区里

分区内部有序,分区外部无序

Kafka AKF分析

x轴,对parttion进行副本备份,副本(理论上可以读写分离,但容易出现一致性问题,干脆只能在主P上进行读写)

y轴:topic,不同的业务使用不同topic

z轴:partiton,对无关的数据打散到不同的分片,分而治之。将相关的数据按顺序聚合到同一个分片

Zookeeper

单机管理->主从集群 分布式协调,zk不用于存储

broker与partition的关系?

broker与zookeeper的关系?

controller

Metadata:topic,partition,broker

旧版本:producer是通过zookeeper获取集群节点信息的

新老版本的区别:角色之间通信,在业务层次上不再依赖zookeeper(减少zk的负载)

Producer

在并发情况下,注意一致性(顺序性保证)的问题

lock() { 
		sql
		producer.produce()
} unlock();

数据保存在哪里?

kafka的broker的partition里

Consumer

consumer与patition的关系:1:n/ 1:1

思考,consumer与patition的关系n:1可不可以? 破坏有序性

group

不同业务组之间,需要消费同一topic的数据,可以使用不同的group

在单一的使用场景下,先要保证,即便追求性能,用多个consumer,应该注意,不能一个分区由多个consumer消费

数据的重复利用是站在group上

offset

比如consumer重启,会不会导致数据重复消费和丢失,围绕的是消费的进度offset

起初consumer在runtime里维护自身的consumer

旧版本的offset是通过consumer与zookeeper通信维护的

新版kafka能自己维护offset

offset持久化节奏,频率,先后?

两大问题:

  1. 丢失
  2. 重复消费

异步的:每间隔5s,先处理业务逻辑,异步提交offset,重复消费

同步的:处理业务逻辑,同步提交offset

Consume流程没处理好,提交offset在业务逻辑处理之前,导致丢失

hbase,es,myisam顺序写

新版offset的维护

consumer->broker(runtime)->mem metadata->磁盘,持久层

总结

  1. 本节课从分布式AFK角度,分析了kafka作为一个分布式消息中间件(高可用,高扩展),从架构角度对xyz轴分析,分别对应由副本,partition,topic的出现。
  2. 同时既然作为一个集群,就需要有一个协调者,引入了zookeeper,新旧版本的kafka对zookeeper这块进行了比较大的升级。
  3. zookeeper管理的本质其实是tomcat进程,逻辑意义是broker,里面会有一个controller(主)的概念
  4. 为了消息的顺序性消费,引入了producer和consumer,以及从架构角度如何保证消息顺序消费,不重复消费,以及消息丢失等问题。
  5. 为了解决丢失和重复消费的问题,引入了offset消息消费进度的概念,以及放在哪里进行维护比较好(zookeeper?kafka?三方比如redis/mysql)

思考

  1. Redis哨兵很像 kafka 集群中的 zookeeper 的功能
  2. 微服务AKF拆分,涉及的三种集群模式:主主,主从,主备
    1. 前两种比较容易理解,最后一种备节点不提供读写,随时顶替上去。

Kafka学习笔记

2. kafka安装以及常用的命令

zookeeper的安装

  • brew方式安装

    • 参考博客
    • 启动zookeeper: zookeeper alongso_pro$ zkServer
    • 查看zookeeper的状态: zookeeper alongso_pro$ zkCli
  • 解压缩方式安装:

    • 参考博客
    • zookeeper安装目录:/usr/local/myapp/zookeeper-3.4.12
    • 进入zookeeper安装目录,启动zookeeper:sudo ./bin/zkServer.sh start
    • 停止zookeeper:sudo ./bin/zkServer.sh stop

kafka的安装

  • brew方式安装
    • 进入kafka安装目录,启动zookeeper:
      • bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
    • 启动kafka:
      • bin/kafka-server-start /usr/local/etc/kafka/server.properties
  • 解压缩方式安装:
    • 解压缩方式安装
      • 进入指定的文件(安装目录)创建logs文件夹,并且赋予用户能写 的权限(将/usr/local/myapp/kafka下的文件都赋予)
      • 启动zookeeper:
        • kafka alongso_pro$ ./bin/zookeeper-server-start.sh /usr/local/myapp/kafka/config/zookeeper.properties &
      • 启动kafka:
        • kafka alongso_pro$ ./bin/kafka-server-start.sh /usr/local/myapp/kafka/config/server.properties &
    • 遇到的问题
      • 启动消费者的时候报错:Connection to node -1 could not be established. Broker may not be available.
        • 参考博客参考
        • 傻逼了,根据端口访问加什么:
          • ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic1 --from-beginning
      • zookeeper启动的时候报错 java.io.IOException: No snapshot found, but there are log entries. Something is broken!

kafka常用命令

-- 低于kafka2.2版本的所有的命令需要依赖zookeeper节点,不支持--bootstrap-server命令
-- 给kafka创建topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    - –create 创建主题命令
    - –zookeeper localhost:2181 指定zookeeper
    - –replication-factor 1 指定副本个数
    - –partitions 1 指定分区个数
    - –topic test 主题名称为“test”
-- 查看topic list
  - ./bin/kafka-topics.sh --list --zookeeper localhost:2181
-- 打开一个窗口输入命令创建一个生产者:
  - ./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test


-- 高于2.2版本,支持--bootstrap-server命令
-- topic创建
./bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic test2

-- 查看topic
./bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test2

-- 查看消费者组消费的进度offset
./bin/kafka-consumer-groups.sh  --bootstrap-server 127.0.0.1:9092 --group consumer0 --describe

-- 消费消息
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

-- 生产消息
./bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test

-- 查看list
./bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092

-- 生产&发送键-值对消息
./bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --property parse.key=true

-- 消费&打印键-值对消息
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --property print.key=true


参考资料

  1. topic创建参考
  2. 操作参考
  3. 官方文档

topic+partition消费逻辑

partition

分区:桶

如果没有顺序上的约束的话:水平扩展

消息V

一旦消息(消息很多,但是消息种类一定很多),而且需要同一类消息的有序性

消息是KV,相同的key一定去打到一个分区里

Broker会保证producer同key(类型)消息的顺序

一个分区可能有不同的key,且不同的key是交叉的,相同的key在一个分区里没有排列在一起。

拉取VS推送

推送:说的是server,主动去推送,网卡打满

拉取:consumer,自主,按需,去订阅拉取server的数据

拉取粒度

如何拉取

  1. Kafka consumer以什么粒度拉取消息

    出于性能考虑,每次IO会批量拉取数据

如何维护

  1. Kafka consumer以什么粒度更新&持久化offset?

    单线程:一条一条处理的时候,按顺序处理的时候,来更新offset,速度比较慢,硬件资源浪费

    1-1多线程:offset维护?按条还是按批次?

    什么情况下多线程的优势发挥到极致?具备隔离性

    多线程的情况下,要加如记录级的判定顺序,决策更新谁的offset

    2多线程:流式计算,充分利用线程

批次(如何保证顺序处理)

  1. consumer拉取到消息用多线程还是单线程去处理?
  2. Offset如何维护?

consume使用单线程与多线程的利弊

单线程:

按顺序,单条处理,offset就是递增的,无论对db,offset频率,成本有点高,CPU,网卡,资源浪费,粒度比较细

流式的多线程:

能多线程的多线程,但是,将整个批次的事务环节交给一个线程,做到这个批次,要么成功,要么失败,减少对DB的压力,和offset频率的压力,更多的去利用cpu和网卡硬件资源,粒度比较粗

Kafka学习笔记

3. 基础开发及消费者提交维护offset不同粒度

编码实现kafka生产与消费消息

  1. 零拷贝

  2. producer面向的是broker

  3. consumer消费消息依赖于消费者组

  4. kafka自己存储数据,指定消息的offset

  5. 自动异步提交时(默认每5s提交一次),导致的问题?

    1. 重复消费&消息丢失
    2. 场景
      1. 还没到时间,挂了,没提交,重起一个consumer,参照offset的时候,会重复消费
      2. 一个批次的数据还没写数据库成功,但是这个批次的offset被异步提交了,挂了,重起一个consumer,参照offset的时候,会导致消息丢失。
  6. 指定一次拉取的最多条数

  7. 指定拉取一次的超时时间

  8. 消费的时候可以指定开始消费的下标

    1. latest和earliest区别
      1. latest—表示一个新的消费者组,刚启动时不消费历史数据(即之前已经被别的组消费的数据)
      2. earliest—表示新消费者组启动之后,会开始重新消费历史数据
  9. 分区分配:一个消费者可以消费同一个topic多个分区的数据

  10. 手动提交offset的配置,按照分区进行处理消息

实践

  1. 单节点kafka只能有一个副本,可以有多个分区
  2. kafka开启自动提交,默认是每隔5s自动提交一次offset
  3. latest和earliest区别
    1. earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    2. latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    3. 结论:提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

生产者代码

  1. 生产者架构图

    kafka生产者流程图

  2. 创建Kafka生产者

    1. boostrap.servers
    2. key.serializer
    3. value.serializer
  3. 代码示例

    package com.focus.kafka.produce;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.jupiter.api.Test;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class ProducerTest {
    
        // ./bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic test2
        @Test
        public void producer() throws ExecutionException, InterruptedException {
    
            String topic = "test2";
            Properties p = new Properties();
            p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            //kafka  持久化数据的MQ  数据-> byte[],不会对数据进行干预,双方要约定编解码
            //kafka是一个app::使用零拷贝  sendfile 系统调用实现快速数据消费
            p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
    
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
    
            //现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic
    
            /*
            test2,2partition,三种商品,每种商品有线性的3个ID,相同的商品最好去到一个分区里
             */
    
    
            while (true) {
                for (int i = 0; i < 3; i++) {
                    for (int j = 0; j < 3; j++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item" + j, "val" + i);
    //                    Future<RecordMetadata> send = producer.send(record);
                        Future<RecordMetadata> send = producer.send(record, new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata metadata, Exception exception) {
                                if (exception != null) {
                                    exception.printStackTrace();
                                }
                            }
                        });
                        RecordMetadata rm = send.get();
                        int partition = rm.partition();
                        long offset = rm.offset();
                        System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
    
                    }
                }
            }
        }
    
    
        /**
         * key: item0 val: val0 partition: 1 offset: 0
         * key: item1 val: val0 partition: 0 offset: 0
         * key: item2 val: val0 partition: 1 offset: 1
         * key: item0 val: val1 partition: 1 offset: 2
         * key: item1 val: val1 partition: 0 offset: 1
         * key: item2 val: val1 partition: 1 offset: 3
         * key: item0 val: val2 partition: 1 offset: 4
         * key: item1 val: val2 partition: 0 offset: 2
         * key: item2 val: val2 partition: 1 offset: 5
         * key: item0 val: val0 partition: 1 offset: 6
         * key: item1 val: val0 partition: 0 offset: 3
         * key: item2 val: val0 partition: 1 offset: 7
         * key: item0 val: val1 partition: 1 offset: 8
         * key: item1 val: val1 partition: 0 offset: 4
         * key: item2 val: val1 partition: 1 offset: 9
         * key: item0 val: val2 partition: 1 offset: 10
         * key: item1 val: val2 partition: 0 offset: 5
         * key: item2 val: val2 partition: 1 offset: 11
         * key: item0 val: val0 partition: 1 offset: 12
         * key: item1 val: val0 partition: 0 offset: 6
         * key: item2 val: val0 partition: 1 offset: 13
         * key: item0 val: val1 partition: 1 offset: 14
         * key: item1 val: val1 partition: 0 offset: 7
         * key: item2 val: val1 partition: 1 offset: 15
         * key: item0 val: val2 partition: 1 offset: 16
         * key: item1 val: val2 partition: 0 offset: 8
         * key: item2 val: val2 partition: 1 offset: 17
         * key: item0 val: val0 partition: 1 offset: 18
         * key: item1 val: val0 partition: 0 offset: 9
         * key: item2 val: val0 partition: 1 offset: 19
         * key: item0 val: val1 partition: 1 offset: 20
         * key: item1 val: val1 partition: 0 offset: 10
         * key: item2 val: val1 partition: 1 offset: 21
         * key: item0 val: val2 partition: 1 offset: 22
         *
         * **/
    }
    
  4. kafka的分区数据视图

    kafka的分区数据视图

  5. 发送消息的3种方式

    1. 发送并忘记:消息可能会被丢失 producer.send(record)

    2. 同步发送:等待kafka返回结果 producer.send(record).get()

    3. 异步发送:设置一个回调函数,记录&处理异常信息,需要实现Callback接口

      Future<RecordMetadata> send = producer.send(record, new Callback() {
      @Override
      public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            exception.printStackTrace();
          }
      }});
      
  6. 生产者重要的配置参数

    1. acks参考:副本机制、同步机制、ISR机制
      1. acks=0,不会等待任何broker的响应,只是发送,消息丢失了不知道,为了吞吐量优先
      2. acks=1,集群首领(Leader)副本收到消息,会收到消息成功的响应。如果首领副本崩溃,如果消息还没有被复制到新的首领副本,则消息还是有可能丢失。
      3. acks=all(-1),所有副本全部收到消息时,生产者才会收到成功的响应
  7. 序列化器

    1. 强烈建议使用通用的序列化框架
  8. 分区

    1. 如果key为null,并且使用了默认的分区器,那么记录将被随机发送给主题的分区,分区器使用轮询调度算法将消息均衡分布给分区
    2. 如果有key的话,会对key进行hash取模(使用kafka自己的哈希算法,即使jdk升级,分区也不会改变)

消费者代码

  1. 消费者与消费者组

    1. 对消费者进行横向扩展,kafka消费者从属于消费者组。一个群组里的消费
  2. 消费者群组与分区再均衡

    1. 主动再均衡(stop the world)
    2. 协作再均衡(一部分分区会重新进行分配)
  3. 群组固定成员

    1. group.instance.id:固定成员的唯一id
    2. session.timeout.ms:大概表示的是,当固定成员关闭时,大概多长时间离开群组。如果这个参数设置的足够大,可以避免进行简单的应用程序重启时出发再均衡。又要设置得足够小,以便于出现严重停机时自动重新分配分区。
  4. 消费者代码

        @Test
        public void consumer0() {
            /**
             * ./bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
             **/
    
            //基础配置
            Properties p = new Properties();
            p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //消费的细节
            p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer2");
            //KAKFA IS MQ  IS STORAGE
            p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//第一次启动,没有offset
            /**
             *         "What to do when there is no initial offset in Kafka or if the current offset
             *         does not exist any more on the server
             *         (e.g. because that data has been deleted):
             *         <ul>
             *             <li>earliest: automatically reset the offset to the earliest offset
             *             <li>latest: automatically reset the offset to the latest offset</li>
             *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
             *         </ul>";
             */
            p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交时异步提交,丢数据&&重复数据
            //一个运行的consumer ,那么自己会维护自己消费进度
            //一旦你自动提交,但是是异步的
            //1,还没到时间,挂了,没提交,重起一个consuemr,参照offset的时候,会重复消费
            //2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失消费
    
            p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");//5秒
    //        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少?
    
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
    
    
            //kafka 的consumer会动态负载均衡
            consumer.subscribe(Arrays.asList("test3"));
    
            while (true) {
                /**
                 * 常识:如果想多线程处理多分区
                 * 每poll一次,用一个语义:一个job启动
                 * 一次job用多线程并行处理分区,且job应该被控制是串行的
                 * 以上的知识点,其实如果你学过大数据
                 */
                //微批的感觉
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));// 0~n
    
    
                Iterator<ConsumerRecord<String, String>> iter = records.iterator();
                while (iter.hasNext()) {
                    //因为一个consuemr可以消费多个分区,但是一个分区只能给一个组里的一个consuemr消费
                    ConsumerRecord<String, String> record = iter.next();
                    int partition = record.partition();
                    long offset = record.offset();
                    String key = record.key();
                    String value = record.value();
    
                    System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
                }
            }
        }
    
  5. 轮询

    1. max.poll.interval.ms:消费者poll()方法调用最大间隔时间,如果超过这个值,消费者将被认为已经“死亡”。
  6. 消费者配置

    1. fetch.max.wait.ms
    2. fetch.min.bytes
    3. fetch.max.bytes
    4. max.poll.records
    5. enable.auto.commit
  7. 提交偏移量

    1. 自动提交,每次间隔多长时间提交
    2. 提交当前偏移量:commitSync,同步提交,会发生阻塞。commitSync()将会提交poll()返回的最新偏移量
    3. 异步提交:commitASync,由于是异步提交,就不能保证先执行的先提交成功,比如偏移量为2000的因为通信原因在偏移量3000提交成功之后,再提交成功,就会出现消息重复消费,该方法支持回调
    4. 同步和异步
  8. 再均衡监听器

    1. ConsumerRebalanceListenner
      1. onPartitionsAssigned:再平衡已经结束,并且开始拉取消息之前,调用,可以用来找到正确的消费点位(偏移量)
      2. onPartitionsRevoked:再平衡开始之前,并且消费者停止读取消息之后调用,这里可以用来提交offset,用来记录消费点位。
  9. 从特定偏移量位置读取记录

    1. seekToBeginning(),从分区的起始位置读取消息
    2. seekToEnd(),从分区的末尾位置读取消息
  10. 消费者退出

  11. consumer.close()

  12. 反序列化器

思考

  1. offset可以按照什么粒度去维护的?
    1. 按照分区
  2. 手动提交的,维护offset的3种方式
    1. 按记录消费进度同步提交
    2. 按分区粒度同步提交
    3. 按当前poll的批次同步提交
  3. 无论是单线程还是多线程,需要poll的数据处理的事务和offset必须是一致的

Kafka学习笔记

4. ISR,OSR,AR,LW,HW,LEO,ACK原理理论

kafkaIO

零拷贝—找周老师的IO课

sendfile(in, offset,out)

kafka对数据只是发送,没有加工的过程

分区的可靠性(CAP)

要解决一个问题,可能会引入其他问题,比如一致性问题

一致性

  1. 强一致性
    1. 所有节点比必须ack
  2. 最终一致性
    1. 过半机制
  3. 弱一致性
    1. ISR(in-sync replicas同步副本),连通性&活跃性
    2. OSR(outof-sync replicas不同步副本),超过阈值时间(10s),没有心跳
    3. AR(Assignes replicas),面向分区的副本集合,创建topic的时候你给出了分区的副本数
    4. AR=ISR+OSR
  4. ack=-1的时候,多个broker的消息进度是一致的
    1. 会与ISR相关的节点进行ack
  5. tradeoff
    1. 不要强调磁盘的可靠性,转向异地多机的同步
    2. 如果拿磁盘做持久化,优先pagecache或者绝对磁盘
    3. 在多机集群分布式的时候,强一致性,最终一致性(过半,ISR)
    4. 总结:
      1. redis,宁可用HA,不用刻意追求AOF的准确性
      2. 像Kafka,我们追求ack=-1,要求磁盘的可靠性

kafka弹性存储

日志复制offset的概念

首先这里有两个Broker,也就是两台服务器,然后它们的分区中分别存储了两个 p0 的副本,一个是 Leader,一个是 Follower, 此时生产者开始往 Leader Partition 发送数据,数据最终写到磁盘上的。然后 Follower 会从 Leader那里去同步数据,Follower上的数据也会写到磁盘上。可是 Follower 是先从 Leader 那去同步然后再写入磁盘的,所以它磁盘上面的数据肯定会比 Leader 的那块少一些

概念

  1. LW:Low Watermark 低水位

    1. 分区中所有已经被成功写入但尚未被消费者消费的最小偏移量,用于跟踪已经消费的消息,管理消息的删除以及控制消费者的位置。
  2. HW:High Watermark,高水位

    1. 副已经被成功写入并且已经被所有 ISR(in-sync replicas)确认的最大偏移量,它用于控制消费者只能消费已经被完全确认的消息,并且用于管理消息的删除。
  3. LEO:LogEndOffset,日志末端位移:

    1. 副本中下一条待写入消息的offset

在 Kafka 中HW高水位的作用主要有2个:

  • 用来标识分区下的哪些消息是可以被消费者消费的。
  • 协助 Kafka 完成副本数据同步

而LEO一个重要作用就是用来更新HW:

  • 如果 Follower 和 Leader 的 LEO 数据同步了, 那么 HW 就可以更新了。
  • HW 之前的消息数据对消费者是可见的, 属于 commited 状态, HW 之后的消息数据对消费者是不可见的。

offset的HW和LEO概念

如上图所示: 每个副本会同时维护 HW 与 LEO 值:

  • Leader 保证只有 HW 及其之前的消息,才对消费者是可见的。

  • Follower 宕机后重启时会对其日志截断,只保留 HW 及其之前的日志消息(新版本有改动)

  • 对于日志末端位移, 即 Log End Offset (LEO)。它表示副本写入下一条消息的位移值。注意: 数字 12 所在的方框是虚线,说明这个副本当前只有 12 条消息,位移值是从 0 到 11,下一条新消息的位移是 12。显然,介于高水位和 LEO 之间的消息就属于未提交消息。即同一个副本对象,其高水位值不会大于 LEO 值。

  • 高水位和 LEO 是副本对象的两个重要属性。Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。即分区的高水位就是其 Leader 副本的高水位。

TradeOff

Keeping track of what has been consumed is, surprisingly, one of the key performance points of a messaging system. Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structures used for storage in many messaging systems scale poorly, this is also a pragmatic choice--since the broker knows what is consumed it can immediately delete it, keeping the data size small.

What is perhaps not obvious is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by exactly one consumer within each subscribing consumer group at any given time. This means that the position of a consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

Book

Designing Event-Driven System

Kafka:The Definitive Guide Second Edition

Kafka简介

Apache Kafka is an event streaming platform used to collect, process, store, and integrate data at scale. It has numerous use cases including distributed streaming, stream processing, data integration, and pub/sub messaging.

财富榜top100中,80%的公司用了kafka

分布式流处理,数据聚合,消息发布和订阅

Due to Kafka's high throughput, fault tolerance, resilience, and scalability, there are numerous use cases across almost every industry

高吞吐量,容错性,弹性和可扩展性

Data Integration
Metrics and Monitoring
Log Aggregation
Stream Processing
Publish-Subscribe Messaging

看到了这里:https://developer.confluent.io/what-is-apache-kafka/

kafka coordinator

"协调者"有些陌生,所谓协调者,在Kafka 中对应的术语是Coordinator,它专门为Consumer Group 服务,负责Group Rebalance 以及提供位移管理和组成员管理等

coordinator:https://zhuanlan.zhihu.com/p/148131411

kafka整体架构:https://cloud.tencent.com/developer/beta/article/1005708?areaSource=106001.1

官方文档:https://kafka.apache.org/documentation/#consumerconfigs

Offset:https://www.cnblogs.com/edisonchou/p/kafka_study_notes_part9.html

Kafka权威指南

第一章 初识kafka

  1. 数据为企业发展提供动力,从数据中获取信息,对它们进行分析处理,并生成更多的数据
    1. 就比如:在网站上浏览了感兴趣的商品,那么浏览信息就会转化成商品推荐,展示给我们
  2. 如何移动数据几乎变得与数据本身一样重要,数据管道会成为关键组件

相关名词

消息

kafka的数据单元,相当于数据库中的一条“记录”,由字节数组组成

消息中可选的元数据,也就是键,也是字节数组,相当于数据库分区键,用来指定分区的

批次

为提高效率,消息会被分成批次写入kafka,批次包含了一组属于同一个主题和分区的消息。

模式

消息传输模式,比如常见的JSON,XML,Apache Avro—Hadoop开发的一款序列化框架

主题和分区

通过主题进行分类,好比数据库的表,可以被分成若干个分区,一个分区就是一个提交日志,消息会以追加的方式被写入分区,然后按照先入先出的顺序读取。

用流来描述Kafka这类系统中的数据,把一个主题的数据看成一个表,流是一组从生产者移动到消费者的数据。Kafka streams,Apache samza,Storm

生产者

消息键与分区器

消费者

消费者通过检查消息的偏移量来区分已经读过的消息,偏移量是另一种元数据

消费者组

属于同一个消费者组的一个或多个消费者共同读取一个主题,消费者与分区之间的映射通常被称为消费者对分区的所有权关系。

broker和集群

leader,follower,副本,同步,保留消息在broker

Kafka的应用场景

  1. 活动跟踪
  2. 传递消息
  3. 指标和日志记录
  4. 提交日志
  5. 流式计算,真正的流式处理通常是指提供了类似map/reduce(Hadoop)处理功能的应用程序。

深入Kafka

讲解思路

  • Kafka 如何进⾏复制;
  • Kafka 如何处理来⾃⽣产者和消费者的请求;
  • Kafka 的存储细节,⽐如⽂件格式和索引。

集群成员关系

控制器broker

控制器

Kafka 使⽤ Zookeeper 的临时节点来选举控制器,并在节点加⼊集群或退出集群时通知 控制器。控制器负责在节点加⼊或离开集群时进⾏分区⾸领选举。控制器使⽤ epoch 来避免“脑 裂”。“脑裂”是指两个节点同时认为⾃⼰是当前的控制器。

控制器作用

Kafka 控制器组件(Controller) 即 Broker, 是 Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但是在运行过程中,只能有一个 Broker 成为控制器,来执行管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。下面我们就来详细聊聊控制器的原理和内部运行机制。

  1. topic管理
  2. 分区重分配
  3. Leader选举
  4. 集群成员管理
  5. 提供数据服务
    1. 控制器会向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

复制

在 Kafka 的⽂档⾥,Kafka 把⾃⼰描述成“⼀个分布式的、可分区 的、可复制的提交⽇志服务”。

⾸领副本

每个分区都有⼀个⾸领副本。为了保证⼀致性,所有⽣产者请求和消费者请求都会经过这个副 本。 跟随者副本 ⾸领以外的副本

跟随者副本

⾸领以外的副本都是跟随者副本。跟随者副本不处理来⾃客户端的请求,它们唯⼀的任务就是从 ⾸领那⾥复制消息,保持与⾸领⼀致的状态。如果⾸领发⽣崩溃,其中的⼀个跟随者会被提升为新⾸ 领。

为了与⾸领保持同步,跟随者向⾸领发送获取数据的请求,这种请求与消费者为了读取消息⽽发送的 请求是⼀样的。⾸领将响应消息发给跟随者。请求消息⾥包含了跟随者想要获取消息的偏移量,⽽且 这些偏移量总是有序的。

同步的副本

持续请求得到的最新消息副本被称为同步的副本。在⾸领发⽣失效时,只有同步副本才有可能被 选为新⾸领。

不同步副本

跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过 replica.lag.time.max.ms 参数 来配置的。

元数据请求

客户端怎么知道该往哪⾥发送请求呢?客户端使⽤了另⼀种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息⾥指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是⾸领。元数据请求可以发送给任意⼀个 broker,因为所有 broker 都缓存了这些信息。

包含⾸领副本的 broker 在收到⽣产请求时,会对请求做⼀些验证

发送数据的⽤户是否有主题写⼊权限? 请求⾥包含的 acks 值是否有效(只允许出现 0、1 或 all)? 如果 acks=all,是否有⾜够多的同步副本保证消息已经被安全写⼊?(我们可以对 broker 进 ⾏配置,如果同步副本的数量不⾜,broker 可以拒绝处理新消息。在第 6 章介绍 Kafka 持久性 和可靠性保证时,我们会讨论更多这⽅⾯的细节。)

炼狱

在消息被写⼊分区的⾸领之后,broker 开始检查 acks 配置参数——如果 acks 被设为 0 或 1,那 么 broker ⽴即返回响应;如果 acks 被设为 all,那么请求会被保存在⼀个叫作炼狱的缓冲区⾥, 直到⾸领发现所有跟随者副本都复制了消息,响应才会被返回给客户端。

零复制技术

Kafka 使⽤零复制技术向客户端发送消息——也就是说,Kafka 直接把消息从⽂件(或者更确 切地说是 Linux ⽂件系统缓存)⾥发送到⽹络通道,⽽不需要经过任何中间缓冲区。这是 Kafka 与 其他⼤部分数据库系统不⼀样的地⽅,其他数据库在将数据发送给客户端之前会先把它们保存在本地 缓存⾥。这项技术避免了字节复制,也不需要管理内存缓冲区,从⽽获得更好的性能。

Kafka 最为常⻅的⼏种请求类型

元数据请求、⽣产请求和获取请求

kafka的升级

协议在持续演化——随着客户端功能的不断增加,我们需要改进协议来满⾜需求。例如,之前的 Kafka 消费者使⽤ Zookeeper 来跟踪偏移量,在消费者启动的时候,它通过检查保存在 Zookeeper 上的偏移量就可以知道从哪⾥开始处理消息。因为各种原因,我们决定不再使⽤ Zookeeper 来保存偏移量,⽽是把偏移量保存在特定的 Kafka 主题上。

可靠性

可靠性是系统的⼀个属性,⽽不是⼀个独⽴的组件,所以在讨论 Kafka 的可靠性保证时,还是要从系统的整体出发。

有些场景要求很⾼的可靠性,⽽有些则更看重速度和简便性。

Kafka 被设计成⾼度可配置的,⽽且它的客户端 API 可以满⾜不同程度的可靠性需求。

讲解思路

  1. 本章先讨论各种各样的可靠性及其在 Kafka 场景中的含义。
  2. 然后介绍 Kafka 的复制功能,以及 它是如何提⾼系统可靠性的。
  3. 随后探讨如何配置 Kafka 的 broker 和主题来满⾜不同的使⽤场景需求,也会涉及⽣产者和消费者以及如何在各种可靠性场景⾥使⽤它们。
  4. 最后介绍如何验证系统的可靠性,因为系统的可靠性涉及⽅⽅⾯⾯——⼀些前提条件必须先得到满⾜。

可靠性保证

只有当消息被写⼊分区的所有同步副本时(但不⼀定要写⼊磁盘),它才被认为是“已提交”的。

复制

Kafka 的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka 可以保证分区 ⾥的事件是有序的,分区可以在线(可⽤),也可以离线(不可⽤)。每个分区可以有多个副本,其 中⼀个副本是⾸领。所有的事件都直接发送给⾸领副本,或者直接从⾸领副本读取事件。其他副本只 需要与⾸领保持同步,并及时复制最新的事件。当⾸领副本不可⽤时,其中⼀个同步副本将成为新⾸ 领。

分区⾸领是同步副本,⽽对于跟随者副本来说,它需要满⾜以下条件才能被认为是同步的。

  1. 与 Zookeeper 之间有⼀个活跃的会话,也就是说,它在过去的 6s(可配置)内向 Zookeeper 发送过⼼跳。
  2. 在过去的 10s 内(可配置)从⾸领那⾥获取过消息。 在过去的 10s 内从⾸领那⾥获取过最新的消息。
  3. 光从⾸领那⾥获取消息是不够的,它还必须是⼏ 乎零延迟的。

如果跟随者副本不能满⾜以上任何⼀点,⽐如与 Zookeeper 断开连接,或者不再获取新消息,或者 获取消息滞后了 10s 以上,那么它就被认为是不同步的。

broker配置

broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。

复制系数

replication.factor,主题级别的配置参数,因为 Kafka 的默认复制系数就是 3——不过⽤户可以修改它。即使是在 主题创建之后,也可以通过新增或移除副本来改变复制系数。

如果复制系数为 N,那么在 N-1 个 broker 失效的情况下,仍然能够从主题读取数据或向主题写⼊数 据。所以,更⾼的复制系数会带来更⾼的可⽤性、可靠性和更少的故障。另⼀⽅⾯,复制系数 N 需要 ⾄少 N 个 broker,⽽且会有 N 个数据副本,也就是说它们会占⽤ N 倍的磁盘空间。我们⼀般会在 可⽤性和存储硬件之间作出权衡。

副本的分布也很重要。默认情况下,Kafka 会确保分区的每个副本被放在不同的 broker 上。

为了避免机架级别的故障,我们建议把 broker 分布在多个不同的机架上,并使⽤ broker.rack 参数来为每个 broker 配置所在机架的名字。如果 配置了机架名字,Kafka 会保证分区的副本被分布在多个机架上,从⽽获得更⾼的可⽤性。

不完全的⾸领选举

unclean.leader.election.enable只能在 broker 级别(实际上是在集群范围内)进⾏配置,它的默认 值是 true。

我们之前提到过,当分区⾸领不可⽤时,⼀个同步副本会被选为新⾸领。如果在选举过程中没有丢失 数据,也就是说提交的数据同时存在于所有的同步副本上,那么这个选举就是“完全”的。 但如果在⾸领不可⽤时其他副本都是不同步的,我们该怎么办呢?

简⽽⾔之,如果我们允许不同步的副本成为⾸领,那么就要承担丢失数据和出现数据不⼀致的⻛险。 如果不允许它们成为⾸领,那么就要接受较低的可⽤性,因为我们必须等待原先的⾸领恢复到可⽤状 态。

如果把 unclean.leader.election.enable 设为 true,就是允许不同步的副本成为⾸领(也就 是“不完全的选举”),那么我们将⾯临丢失消息的⻛险。如果把这个参数设为 false,就要等待原先 的⾸领重新上线,从⽽降低了可⽤性。

银行系统,信用卡支付事务/实时点击流分析系统

最少同步副本

在主题级别和 broker 级别上,这个参数都叫 min.insync.replicas。

根据 Kafka 对可 靠性保证的定义,消息只有在被写⼊到所有同步副本之后才被认为是已提交的。

对于 ⼀个包含 3 个副本的主题,如果 min.insync.replicas 被设为 2,那么⾄少要存在两个同步副本 才能向分区写⼊数据。 如果 3 个副本都是同步的,或者其中⼀个副本变为不可⽤,都不会有什么问题。不过,如果有两个副 本变为不可⽤,那么 broker 就会停⽌接受⽣产者的请求。尝试发送数据的⽣产者会收到 NotEnoughReplicasException 异常。消费者仍然可以继续读取已有的数据。实际上,如果使⽤ 这样的配置,那么当只剩下⼀个同步副本时,它就变成只读了,这是为了避免在发⽣不完全选举时数 据的写⼊和读取出现⾮预期的⾏为。为了从只读状态中恢复,必须让两个不可⽤分区中的⼀个重新变 为可⽤的(⽐如重启 broker),并等待它变为同步的

在可靠的系统⾥使⽤⽣产者

  1. 根据可靠性需求配置恰当的 acks 值。
  2. 在参数配置和代码⾥正确处理错误。

acks(0,1,all)

⼀般情况下,如果你的⽬标是不丢失任何消息,那么最好让⽣产者在遇到可重试错误时能够保持重 试。

重试发送⼀个已经失败的消息会带来⼀些⻛险,如果两个消息都写⼊成功,会导致消息重 复。重试和恰当的错误处理可以保证每个消息“⾄少被保存⼀次”,但当前 的 Kafka 版本(0.10.0)⽆法保证每个消息“只被保存⼀次”。

在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  2. 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  3. 精确一次(exactly once):消息不会丢失,也不会被重复发送。

使⽤⽣产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理⼤部分错误,不过对于开发⼈ 员来说,仍然需要处理其他类型的错误,包括:

  • 不可重试的 broker 错误,例如消息⼤⼩错误、认证错误等;
  • 在消息发送之前发⽣的错误,例如序列化错误;
  • 在⽣产者达到重试次数上限时或者在消息占⽤的内存达到上限时发⽣的错误
  1. 丢弃“不合法的消息”?
  2. 把错误记录下来?
  3. 把这些消息保存在本地 磁盘上?
  4. 回调另⼀个应⽤程序?

具体使⽤哪⼀种逻辑要根据具体的架构来决定。只要记住,如果错误处理只是为了重试发送消息,那么最好还是使⽤⽣产者内置的重试机制。

在可靠的系统⾥使⽤消费者

消费者的可靠性配置

  1. group.id
  2. auto.offset.reset
  3. enable.auto.commit
  4. auto.commit.interval.ms

精准一次、幂等性写⼊

总结

正如我们在本章开头所说的,可靠性并不只是 Kafka 单⽅⾯的事情。我们应该从整个系统层⾯来考虑 可靠性问题,包括应⽤程序的架构、⽣产者和消费者 API 的使⽤⽅式、⽣产者和消费者的配置、主题 的配置以及 broker 的配置。系统的可靠性需要在许多⽅⾯作出权衡,⽐如复杂性、性能、可⽤性和 磁盘空间的使⽤。掌握 Kafka 的各种配置和常⽤模式,对使⽤场景的需求做到⼼中有数,你就可以在 应⽤程序和 Kafka 的可靠性程度以及各种权衡之间作出更好的选择。

Asks=-1,会导致程序阻塞,

讲解思路

  1. AKF
  2. 可靠性保证
  3. kafka的使用场景

ChatGPT写的一篇《Kafka权威指南》总结

《Kafka权威指南》是一本介绍Apache Kafka的技术书籍,是Kafka领域的经典著作之一。在这篇技术分享中,我将从以下几个方面介绍Kafka的核心概念和应用场景。

chatpgt

1. Kafka的基本概念

Kafka是一个分布式流处理平台,主要用于实时处理大规模数据流。在Kafka中,数据被组织成一个或多个主题(Topic),每个主题包含一个或多个分区(Partition),每个分区又由多个消息(Message)组成。生产者(Producer)将消息发送到指定的主题中,而消费者(Consumer)则从指定的主题中读取消息。此外,Kafka还提供了一个可靠的消息传输机制,确保消息在发送和接收过程中不会丢失或重复。

2. Kafka的应用场景

Kafka的应用场景非常广泛,特别适合以下几个方面:

  • 实时数据采集和处理:Kafka可以作为数据采集和处理的中间件,用于收集和处理来自各种数据源的实时数据流,例如日志数据、传感器数据、网络数据等。
  • 大规模数据处理:Kafka可以作为数据集成和分发的中间件,用于支持大规模数据处理和分析,例如批处理、流处理、机器学习等。
  • 消息队列:Kafka可以作为高吞吐量、低延迟的消息队列,用于处理大量的实时消息,例如在线交易、聊天应用、游戏等。

3. Kafka的优势和挑战

Kafka具有以下几个优势:

  • 高吞吐量:Kafka能够支持每秒数百万条消息的高吞吐量,适用于大规模数据处理和实时数据流处理场景。
  • 可扩展性:Kafka可以通过添加新的节点来扩展性能和容量,支持水平扩展和垂直扩展。
  • 可靠性:Kafka提供了可靠的消息传输机制,确保消息不会丢失或重复,支持多副本备份和故障转移。
  • 灵活性:Kafka的消息存储和处理机制非常灵活,可以灵活配置主题、分区和消费者组等参数。

然而,Kafka也面临一些挑战:

  • 复杂性:Kafka的配置和管理比较复杂,需要投入大量的人力和物力来维护和管理。
  • 学习成本:Kafka的核心概念和工作原理比较复杂,需要花费一定的时间和精力来学习和理解。
  • 系统稳定性:Kafka的稳定性和可靠性受到很多因素的影响,例如网络延迟、磁盘故障、节点故障等。

4. Kafka的应用实践

Kafka在实际应用中有很多应用场景和应用实践,例如:

  • 数据集成和分发:使用Kafka将各种数据源集成到一个统一的数据平台中,支持批量处理和实时处理等多种数据处理方式。
  • 实时日志处理:使用Kafka作为日志的收集和分发平台,支持日志的实时处理和分析,例如日志监控、异常检测等。
  • 流处理:使用Kafka作为流处理平台,实现实时数据流的处理和分析,例如实时计算、实时推荐、实时预测等。
  • 消息队列:使用Kafka作为高吞吐量、低延迟的消息队列,支持在线交易、聊天应用、游戏等高并发场景。

除此之外,Kafka还可以和其他大数据技术结合使用,例如和Hadoop、Spark、Flink等进行数据集成和分析,形成一个完整的大数据处理和分析体系。

5. 从哪些方面入手Kafka

为了更好地应用Kafka,建议从以下几个方面入手:

  • 学习Kafka的核心概念和工作原理,理解Kafka的消息传输机制、主题、分区、生产者、消费者等重要概念。
  • 熟悉Kafka的应用场景和应用实践,了解Kafka在实际应用中的优势和挑战,以及如何将Kafka应用于不同的场景。
  • 掌握Kafka的配置和管理技巧,了解如何优化Kafka的性能和稳定性,如何配置Kafka的主题、分区、消费者组等参数。
  • 学习Kafka的客户端API和工具,掌握如何使用Kafka的生产者API、消费者API、管理工具等,进行数据传输、数据处理和数据管理。
  • 掌握Kafka的监控和调试技巧,了解如何监控Kafka的运行状态、诊断Kafka的故障、调优Kafka的性能等。

总之,Kafka是一款非常强大的分布式流处理平台,具有广泛的应用场景和应用实践,但是也需要投入大量的人力和物力来维护和管理。通过深入学习和实践Kafka,可以更好地应用Kafka,实现大规模数据处理和实时数据流处理的需求。

6. 常见的问题和解决方案

另外,值得注意的是,Kafka在实际应用中也存在一些常见的问题和挑战,例如:

  • 消费者组的负载均衡问题:Kafka的消费者组需要进行负载均衡,确保每个消费者处理相同数量的消息,避免出现负载不均衡的情况。
  • 消费者的偏移量管理问题:Kafka的消费者需要管理自己的偏移量,避免出现重复消费或漏消费的情况。
  • 消息丢失和重复问题:尽管Kafka提供了可靠的消息传输机制,但是在实际应用中仍然可能出现消息丢失或重复的情况。

针对这些问题,可以采取一些措施来解决,例如:

  • 使用消费者组协调器来实现消费者组的负载均衡,确保每个消费者处理相同数量的消息。
  • 使用Kafka的偏移量管理机制,确保每个消费者能够正确地管理自己的偏移量,避免出现重复消费或漏消费的情况。
  • 配置Kafka的消息保留策略和副本数,确保消息不会丢失或重复。

最后,需要注意的是,Kafka的应用需要结合具体的业务场景和需求来考虑,需要根据实际情况进行配置和调优,才能发挥Kafka的最佳性能和效果。

7. Kafka的生态系统

除此之外,Kafka的生态系统也非常丰富,有很多与Kafka相关的工具和技术,例如:

  • Kafka Connect:Kafka Connect是一款开源的数据集成工具,可以将Kafka与其他数据源和数据目的地进行集成,支持多种数据格式和协议。
  • Kafka Streams:Kafka Streams是一款开源的流处理框架,可以对Kafka中的数据进行实时处理和分析,支持多种数据操作和聚合操作。
  • Confluent Platform:Confluent Platform是一款基于Kafka的企业级数据中心,提供了一系列与Kafka相关的工具和服务,例如Schema Registry、Kafka REST Proxy、Control Center等。
  • Apache NiFi:Apache NiFi是一款开源的数据流处理工具,可以将Kafka与其他数据源和数据目的地进行集成,支持多种数据格式和协议。
  • Apache Beam:Apache Beam是一款开源的分布式数据处理框架,可以对Kafka中的数据进行批处理和流处理,支持多种数据操作和聚合操作。

这些工具和技术可以帮助用户更好地应用Kafka,实现更多的数据处理和分析需求。同时,也需要注意选择合适的工具和技术,根据实际场景和需求进行选择和配置,才能实现最佳的性能和效果。

8. 总结

作为一款分布式流处理平台,Kafka具有高吞吐量、可扩展性、可靠性和灵活性等特点,适用于大规模数据处理和实时数据流处理场景。同时,Kafka也面临着复杂性、学习成本和系统稳定性等挑战,需要投入大量的人力和物力来维护和管理。在实际应用中,Kafka可以应用于数据集成和分发、实时日志处理、流处理和消息队列等多种场景,并且可以和其他大数据技术结合使用,形成一个完整的大数据处理和分析体系。

Apache Kafka实战

深入理解Kafka

Apache Kafka源码剖析

Kafka简介

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Confluent: Set Your Data in Motion

Confluent is creating the foundational platform for data-in-motion

提供的新架构,解决方案

事件流处理平台

事件处理

上次学到这里

Event Streaming Patterns:https://developer.confluent.io/patterns/event/event/

inside-ksqldb:https://developer.confluent.io/learn-kafka/inside-ksqldb/streaming-architecture/

笔记

Kafka

Kafka是事件处理平台,提供了事件存储能力Kafka的Log,事件计算KSQLDB,Kafka Stream。

Kafka Ecosystem: Kafka,Kafka Stream,Kafka Connect,KSQLDB.

事件处理中出现了问题,那么Materialize可以是发邮件,也可以是存到存储系统中。

kafka发送消息是k,v类型的。没发送一个消息,offset+1 m https://developer.confluent.io/learn-kafka/event-sourcing/event-driven-vs-state-based/

软件设计方法:

state-based(databases+ synchronous network call)

event-based(data at rest + data in motion + event sourcing + CQRS + event streaming)

CRUD:Databases

CR:Event Sourcing

Topic:Event:Key:Partition,Schema

Table:Row:Primary Key,Shard,Schema

Index:Document:_id,Shard,Mapping

Collection:Document:DocumentId,Shard,Schema

Kafka Queue is Log:Append Only and Durable,Reading Message never delete. Other Message is Queue. Message is Bounded Buffer can be deleted. Topic is Log. Not Queue. Topic(DLT) is Queue(DLQ).

same key -> same partition -> in order ,key is null -> round robin ,key is nonull -> hash function

rewind,reprocess,replayable,reblance,

consumer group protocol(join+leave)

Kafka Connector

Consumer+Materialize = Kafka Connect ,Consumer+Stateful = Kafka Stream ,Consumer+Stateless = Kafka Consumer

数据是流动的处理平台(Kafka)和数据是静止的处理平台(MySQL)

DLQ:Dead Letter Queue

Kafka Stream

KTable:Last Updated Value

Kafka Core: Log+Event

Kafka Stream = KStream,KTable,Serialization,Joins,Stateful Operations,Windowing,Times,Processor,

Joins:Stream-Stream(Windows),Stream-Table,Table-Table

Stateless:filter,map, Stateful(use pre event):group by key,reduce,aggregation(sum,count,avg,max,min),join

Time Windows: https://kafka.apache.org/30/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html

Session Windows: https://kafka.apache.org/30/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html

SlidingWindows: https://kafka.apache.org/30/javadoc/org/apache/kafka/streams/kstream/SlidingWindows.html

ksqlDB

ksqlDB:distributed compute layer kafka:distributed storage layer

Streams:unbounded series of event. Table: the current state of event

Stateful Aggregations (Materialized Views)

ksqlDB can build a materialized view of state

Kafka is a database turned inside out

Data Mesh

software architecture vs data architecture

service mesh vs data mesh

配置

broker配置

default.replication.factor

unclean.leader.election.enable

min.insync.replicas

producer配置

acks(0,1,all)

retries

设计

https://docs.confluent.io/platform/current/kafka/design.html

实现

FAQ

https://developer.confluent.io/learn/apache-kafka-faqs/

其他

Kafka vs. Pulsar vs. RabbitMQ

Kafka Stream vs Flink

The Streams API in Kafka and Flink are used in both capacities. The main distinction lies in where these applications live — as jobs in a central cluster (Flink), or inside microservices (Streams API).

The Streams API makes stream processing accessible as an application programming model, that applications built as microservices can avail from, and benefits from Kafka’s core competency —performance, scalability, security, reliability and soon, end-to-end exactly-once — due to its tight integration with core abstractions in Kafka. Flink, on the other hand, is a great fit for applications that are deployed in existing clusters and benefit from throughput, latency, event time semantics, savepoints and operational features, exactly-once guarantees for application state, end-to-end exactly-once guarantees (except when used with Kafka as a sink today), and batch processing.

疑问

kafka一个topic多个消费组情况下,broker是怎么管理log的offset的呢,是每一个消费组提交自己的呢,还是共同提交?

Operations

消费者操作

打印指定偏移量的消费者的消息

./kafka-console-consumer.sh --bootstrap-server brokerIP:brokerPort --topic yourTopic --partition 0 --offset 19831988 --property print.key=true --max-messages 1

其他属性有: print.timestamp Set to true to display the timestamp of each message (if available). print.key Set to true to display the message key in addition to the value. print.offset Set to true to display the message offset in addition to the value. print.partition Set to true to display the topic partition a message is consumed from. key.separator Specify the delimiter character to use between the message key and message value when printing. line.separator Specify the delimiter character to use between messages. key.deserializer Provide a class name that is used to deserialize the message key before printing. value.deserializer Provide a class name that is used to deserialize the message value before printing.

消费组操作

查看消费组

./kafka-consumer-groups.sh --bootstrap-server brokerIP:brokerPort --describe --group yourGroup

重置消费者偏移量

./kafka-consumer-groups.sh --bootstrap-server brokerIP:brokerPort --group yourGroup --topic yourTopic --reset-offsets --to-offset yourOffset --execute

./kafka-consumer-groups.sh --bootstrap-server brokerIP:brokerPort --group yourGroup --topic yourTopic --reset-offsets -to-latest --execute

Streaming

book:making sense of stream processing

Data Integration Problem: Log VS Dual Write

The database is a cache of a subset of the log.

表:更新流,对事实的更新 流:事件流,无界事件

Akka

lightbend-academy

reactive programming vs reactive system vs reactive architecture

Reactive Programming doesn't mean you have created a Reactive System

Reactive System: Reactive Microservice

A system that uses Reactive Programming is not necessarily Reactive.

Location Transport VS Transport Remoting

Reactive Architecture:DDD,Lagom

Reactive Architecture + DDD = same goal

Many of the guidelines and rules in Domain Driven Design are compatible with those in Reactive Architecture.

Akka简介

Build powerful reactive, concurrent, and distributed applications more easily

Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala.

提供的新架构,解决方案

反应式架构,反应式系统,反应式编程

Reactive Systems Architecture

Reactive Architecture:LightBend

The primary goal of reactive architecture is to provide an experience that is responsive under all conditions.

unresponsive software

Lightbend=Akka+Play+Lagom

akka监控:https://www.lightbend.com/blog/akka-monitoring-telemetry

其他

Lightbend CEO Jonas Boner-blog

Jonas Boner-oreilly

Jonas Boner's Book: Reactive Microservices Architecture Reactive Microsystems The Reactive Principles - Design Principles for Distributed Applications

foreword—reactive Messaging Patterns with the Actor Model - 《响应式架构》 foreword—reactive application development - 《反应式应用开发》 foreword—reactive design patterns - 《反应式设计模型》 foreword—functional and reactive domain modeling - 《函数响应式领域建模》

Readlist: http://jonasboner.com/books-that-makes-you-think/

My Repo

https://github.com/xiaozhiliaoo/akka-in-action.git

https://github.com/xiaozhiliaoo/akka-practice

https://github.com/xiaozhiliaoo/learning-akka.git

https://github.com/SalmaKhater/Learning-Akka

使用场景

可扩展的分布式内存数据库

Akka Actor

Progress

Notes

tell,do not ask!

发消息方法: tell(fire and forget) ask(get response) pub/sub

acotr单线程模型,并发安全

become 通过行为改变状态

当actor中出现futrue时候,可以使用pipeline,否则会打破单线程模式

akka,riak,cassandra

ActorRef,ActorPath,Actor Selection

Replace Actor stack behavior vid become/unbecome/FSM

Actor如果是计数器,那么需要持久化。Stateful Actor

Sender -> Command -> Actor -> Event -> DB

线上推荐akka cluster,而不是remote

Send messages:Tell:fire-forget,Ask:Send-And-Receive-Future

Forward message: useful when writing actors that work as routers, load-balancers, replicators etc.

Reference

https://doc.akka.io/docs/akka/current/actors.html#actor-lifecycle

Akka Remoting

Classic Remoting (Deprecated) -> Typed Artery Remoting

https://doc.akka.io/docs/akka/current/remoting.html

akka通信协议,序列化机制是什么?

Akka Http

akka-http收http://spray.io/启发

Akka http = Akka + Akka stream

Akka TestKit

Akka Stream

Akka Distributed Data

多点写入冲突解决: 具有中央节点的数据修改,也涉及CRDT,因为时间有先后,但是最终选哪一个也是问题。多节点更新,然后节点同步达到一致性时候,会有冲突。

除非所有写入本身就存储在一个中央节点,

akka生态至少把akka项目看完,lightbend生态

core

convergent(收敛)

集群中节点间复制简单的会话信息。ORSet Observer Remove Set

https://github.com/lmdbjava/lmdbjava/wiki

paper: Delta State Replicated Data Types

Akka Cluster

学习进度

https://doc.akka.io/docs/akka/current/typed/cluster.html#

笔记

application membership(heartbeat, gossip)

CAP:

available(pick up consistency):membership,distributed data,pubsub,multi-dc support consistency:singleton,sharding,leases

stateful applications

集群单例生成唯一用户ID

Akka Persistent + Akka Singleton 生成唯一用户ID

计算集群总请求数。

发现资源不够,会自动扩展actor来处理任务,资源充足,会减少actor来处理任务

全局状态:集群单例actor实现

actor中有阻塞操作该怎么办?

如果一个分片挂了,那么akka cluster shard是如何处理任务的?

集群路由感知器比集群单例更通用的构造。一个actor可以将任务推给多个worker.

Classic Cluster Aware Router:https://doc.akka.io/docs/akka/current/cluster-routing.html

Akka Persistence

Classic Persistence:https://doc.akka.io/docs/akka/current/persistence.html#at-least-once-delivery

Notes

https://github.com/dnemov/akka-persistence-jdbc/blob/master/src/main/resources/schema/mysql/mysql-schema.sql

Akka Utilities

熔断器

https://doc.akka.io/docs/akka/current/common/circuitbreaker.html

let it crash. 从奔溃中恢复才是弹性系统应该具备的能力。

Book

Akka实战

Akka入门与实践

Akka应用模式

反应式设计模式

反应式应用开发

Progress

Notes

Akka的actor模型中,并没有提供因果一致性,因此落在了开发人员,需要通过Process Manager模式完成Become/Unbecome实现因果一致性。

反应式架构默认是分布式的。

Akka分片:跨集群中多个节点自动分布具有标识符的actor.

Question

消息,命令,事件区别?

Hazelcast

Consul简介

Consul is a service mesh solution providing a full featured control plane with service discovery, configuration, and segmentation functionality.

Hashicorp:delivers consistent workflows to provision, secure, connect, and run any infrastructure for any application.

提供的新架构,解决方案

FAQ

线上容器里面是怎么连上的consul?

consul使用模式?线上部署模式?教务总共三个node,service全部在node上,其他电商的node就是service.

service -> consul client(3 pod) -> consul server()

Consul Agent,Server,Client,Consul cluster,

Using Lib

memdb

bolt

serf

https://github.com/Ecwid/consul-api

Paper

Wiki

Network_tomography

Failure_detector

Hazelcast

Hazelcast简介

Hazelcast is a streaming and memory-first application platform for fast, stateful, data-intensive workloads on-premises, at the edge or as a fully managed cloud service.

产品有:Hazelcast Management Center,IMDG,Jet

学习进度

提供的新架构,解决方案

笔记

HZ核心设计文档:https://github.com/hazelcast/hazelcast/blob/master/docs/design/template.md

Bridging Between Java 8 Streams and Hazelcast Jet

In-Memory Storage/In-Memory Compute/Real-Time Processing

Distributed Data Design(Partitioning and Replication): AP:Replication(lazy replication), primary-copy, best-effort, no strong consistency but monotonic reads guarantee, anti-entropy,at-least-once, CP: Consensus algorithms Raft

嵌入式是否支持”跨应用”发现彼此?

membership具体细节,如何加入和如何退出,以及数据迁移细节官方文档涉及比较少。

Build Distributed System

Core Object:Config,DistributedObject,Node,NodeState,Cluster,HazelcastInstance

FD:PhiAccrualFailureDetector,PhiAccrualClusterFailureDetector,DeadlineClusterFailureDetector,PingFailureDetector

Starter

https://docs.hazelcast.com/hazelcast/5.1/pipelines/spring-boot

Paper

Phi Accrual Failure Detector

Group membership and view synchrony in partitionable asynchronous distributed systems: specifications

cluster

Discovery Mechanisms

Auto Detection:multicast TCP/IP Cloud Discovery

Distributed Data Structures

数据结构选择依据:1 是否partitioned 2 AP or CP保证

AP:

Jackson

Jackson简介

ElasticStack

ElasticSearch

学习进度

看到这里了:https://www.elastic.co/guide/cn/elasticsearch/guide/current/_preventing_combinatorial_explosions.html

笔记

es历史

shay banon

Near Real Time

节点类型

打分机制:从TF-IDF改成BM25,也叫similarity ,scoring,ranking

oversharding问题,如何创建分片数?

如何知道数据在集群中的哪个节点?

es和mysql数据模型区别?

结构化搜索和全文搜索

全文搜索:传统数据库确实很难搞定的任务,传统数据库要么匹配,要么不匹配,es有相关性打分。

PB级别,数百台服务器

一个分片是一个 Lucene 的实例,以及它本身就是一个完整的搜索引擎

每个字段的所有数据都是默认被索引的

index是逻辑概念,面向用户,shard是物理概念,面向机器,应用程序关心索引,而不是分片

Elasticsearch中文档是不可改变的,不能修改它们,在内部,Elasticsearch 已将旧文档标记为已删除,并增加一个全新的文档,并且会重新进行索引。

更新文档:标记删除,创建文档,重新索引,检索-修改-重建索引

文档中的每个字段都将被索引并且可以被查询

在分布式系统中深度分页:在分布式系统中,对结果排序的成本随分页的深度成指数上升

filed:精确值,全文值

很少对全文类型的域做精确匹配

Doc Values:排序,聚合,脚本计算 Invert Index:检索

文档的唯一性:index,type,routing value

检索过程:query then fetch 先查后取 分片节点:from+size 协调节点:numberOfShard * (from+size)

scroll 查询禁用排序

搜索类型:query then fetch,dfs_query_then_fetch

深度分页的代价根源是结果集全局排序,如果去掉全局排序的特性的话查询结果的成本就会很低。

Data Replication:Primary-Backup

in-sync shard(可以被选中为primary的shard)

index配置最重要的是:number_of_shards(创建后更改不了),number_of_replicas(创建后可以修改)

_id 和 _index 字段则既没有被索引也没有被存储,这意味着它们并不是真实存在的。

不能添加新的分析器或者对现有的字段做改动。 如果你那么做的话,结果就是那些已经被索引的数据就不正确, 搜索也不能正常工作。reindex

修改索引类型:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index-aliases.html

应用中使用索引别名,而不是索引真实名字。这样在修改索引时候,应用层不需要变化。

全文检索=全字段索引

倒排索引被写入磁盘后是 不可改变 的 在保留不变性的前提下实现倒排索引的更新?答案是: 用更多的索引。

一个 Lucene 索引包含一个提交点和三个段

一个 Lucene 索引 我们在 Elasticsearch 称作 分片 。 一个 Elasticsearch 索引 是分片的集合

段是不可改变的

内存索引缓冲区

按段搜索

写入和打开一个新段的轻量的过程叫做 refresh,每1s refresh下

Elasticsearch是近实时搜索: 文档的变化并不是立即对搜索可见,但会在一秒之内变为可见

Flush:执行一个提交并且截断 translog 的行为在 Elasticsearch 被称作一次 flush,30分钟一次。 Flush之后段被全量提交,并且事务日志被清空

减少段的数量(通常减少到一个),来提升搜索性能。

结构化搜索:要么在,要么不在,和打分机制无关系。也就是和相似度无关。

精确值查找时候,需要用filter,不会被打分,会被缓存。

尽可能多的使用过滤式查询。

constant_score(常量评分查询) 将 term 查询转化成为过滤器。

term查询转成constant_score查询。(非评分查询)

非评分计算是首先执行的,这将有助于写出高效又快速的搜索请求。

bool过滤器,也叫复合过滤器。

查看索引别名:GET /finance_netease_settle_order/_alias

布尔过滤器可以用来作为构造复杂逻辑条件的基本构建模块。

term 和 terms 是 包含(contains) 操作,而非 等值(equals)

查询优化:普通查询 -> bool filter -> constant_score filter

exists(!=null),missing(==null)查询

filter query 实现bitset的roraing bitmap

bool match查询

多数字符串字段都是 not_analyzed 精确值字段

dfs_query_then_fetch: dfs是指分布式频率搜索(Distributed Frequency Search) , 它告诉 Elasticsearch , 先分别获得每个分片本地的 IDF ,然后根据结果再计算整个索引的全局 IDF 。

多字段搜索:bool match查询

dis_max:分离最大化查询(Disjunction Max Query)

全文搜索被称作是 召回率(Recall) 与 精确率(Precision) 的战场 召回率 ——返回所有的相关文档; 精确率 ——不返回无关文档

TF/IDF

字段中心式(term-centric)查询:best_fields 和 most_fields 词中心式(term-centric)的查询:cross_fields

自定义单字段查询是否能够优于多字段查询,取决于在多字段查询与单字段自定义 _all 之间代价的权衡, 即哪种解决方案会带来更大的性能优化就选择哪一种。

multi_match 查询中避免使用 not_analyzed 字段。

短语匹配(match_phrase查询)

http://people.apache.org/~mikemccand/lucenebench/

unigrams ,bigrams(shingles),trigrams,

shingles 不仅比短语查询更灵活,而且性能也更好。 shingles 查询跟一个简单的 match 查询一样高效,而不用每次搜索花费短语查询的代价。

prefix 查询不做相关度评分计算,它只是将所有匹配的文档返回,并为每条结果赋予评分值 1 。它的行为更像是过滤器而不是查询。

search-as-you-type:match_phrase_prefix

Boolean Model:只是在查询中使用 AND 、 OR 和 NOT

Lucene 使用 布尔模型(Boolean model) 、 TF/IDF 以及 向量空间模型(vector space model) ,然后将它们组合到单个高效的包里以收集匹配文档并进行评分计算。

bool 查询实现了布尔模型

查询时的权重提升 是可以用来影响相关度的主要工具

constant_score 和 function_score区别?

一致随机评分(consistently random scoring)

similarity算法:BM25,TF-IDF

全文搜索是一场 查准率 与 查全率 之间的较量—查准率即尽量返回较少的无关文档,而查全率则尽量返回较多的相关文档。

es多字段,可对字段建立两次索引。文本做两次索引

stem 词干 stemmer 词干提取器

多语言设计:index-per-language, field-per-language

Elasticsearch 为什么不能在 analyzed (分析过)的字符串字段上排序,并演示了如何为同一个域创建 复数域索引 , 其中analyzed域用来搜索,not_analyzed域用来排序。

Analyzed 域无法排序并不是因为使用了分析器,而是因为分析器将字符串拆分成了很多词汇单元,就像一个 词汇袋 , 所以 Elasticsearch 不知道使用那一个词汇单元排序。

analyzed name域用来搜索。 not_analyzed name.raw 域用来排序。

International Components for Unicode (ICU)

单词还原成词根。比如foxes还原成fox.

Internals

https://www.elastic.co/blog/found-elasticsearch-internals https://www.elastic.co/blog/found-elasticsearch-networking

遇到问题

1.forbidden-12-index-read-only-allow-delete-api?

某个节点的磁盘满了,需要运维清理下磁盘

Course

https://github.com/xiaozhiliaoo/geektime-ELK

https://github.com/xiaozhiliaoo/search-practice

LogStash

笔记

Jordan Sissel

persistent queues

Dead Letter Queue

kibana

笔记

Rashid Khan

beat

reference

https://www.elastic.co/guide/en/beats/libbeat/8.0/beats-reference.html

Lucene

Book

Elasticsearch权威指南

Progress

100%

Notes

Elasticsearch: The Definitive Guide: A Distributed Real-Time Search and Analytics Engine 1st Edition

TF-IDF:词频/逆向文档频率

es两个阶段:索引和查询

stemmer:词干提取器

synonyms:同义词

字典词干提取器,算法化词干提取器

实践中一个好的算法化词干提取器一般优于一个字典词干提取器。

Hunspell 词干提取器(拼写检查)

保留停用词最大的缺点就影响搜索性能。

索引结构:Terms dictionary,Postings list,Term frequency,Positions,Offsets,Norms

common_grams 过滤器是针对短语查询能更高效的使用停用词而设计的。

Fuzzy matching 允许查询时匹配错误拼写的单词。原理是编辑距离

fuzzy 查询的工作原理是给定原始词项及构造一个 编辑自动机— 像表示所有原始字符串指定编辑距离的字符串的一个大图表。

聚合=bucket+metrics bucket=group by metrics=count,sum,max...

<国家, 性别, 年龄> 组合的平均薪酬。所有的这些都在一个请求内完成并且只遍历一次数据.

直方图聚合:histogram,date_histogram

例子基础数据 POST /cars/transactions/_bulk { "index": {}} { "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" } { "index": {}} { "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" } { "index": {}} { "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" } { "index": {}} { "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" } { "index": {}} { "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" } { "index": {}} { "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" } { "index": {}} { "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" }

聚合是在查询范围内的,但有时我们也想要搜索它的子集,而聚合的对象却是所有数据:全局桶

聚合结果过滤:过滤桶

只过滤搜索结果,不过滤聚合结果:post_filter

Approximate Aggregations(近似聚合):cardinality(HyperLogLog),percentiles(TDigest-https://github.com/tdunning/t-digest)

max是精确聚合,count(DISTINCT)是近似聚合

big data, exactness, and real-time latency.

精确+实时:数据可以存入单台机器的内存之中,我们可以随心所欲,使用任何想用的算法。结果会 100% 精确,响应会相对快速。 大数据+精确:传统的 Hadoop。可以处理 PB 级的数据并且为我们提供精确的答案,但它可能需要几周的时间才能为我们提供这个答案。 大数据+实时:近似算法为我们提供准确但不精确的结果。

聚合实现:DocValues

Doc values 的存在是因为倒排索引只对某些操作是高效的。 倒排索引的优势在于查找包含某个项的文档,而对于从另外一个方向的相反操作并不高效,即:确定哪些项是否存在单个文档里 搜索使用倒排索引查找文档,聚合操作收集和聚合 doc values 里的数据。 Doc Values 本质上是一个序列化的列式存储。列式存储 适用于聚合、排序、脚本等操作 Doc values 不支持 analyzed 字符串字段。聚合运行在 not_analyzed 字符串而不是 analyzed 字符串,这样可以有效的利用 doc values

聚合一个分析字符串:fielddata

terms 桶基于我们的数据动态构建桶;它并不知道到底生成了多少桶。 大多数时候对单个字段的聚合查询还是非常快的, 但是当需要同时聚合多个字段时,就可能会 产生大量的分组,最终结果就是占用 es 大量内存,从而导致 OOM 的情况发生。

聚合模式:DFS(默认),BFS

广度优先仅仅适用于每个组的聚合数量远远小于当前总组数的情况下,因为广度优先会在内存中缓存裁剪后的仅仅需要缓存的每个组的所有数据, 以便于它的子聚合分组查询可以复用上级聚合的数据。

对单个文件的变更是ACID的,但包含多个文档的变更不支持。

Elasticsearch实战

Progress

P47

深入理解Elasticsearch(原书第三版)

进度

阅读完成

Notes

prefix-query不会被分析,match query会被分析

es1.0多播发现机制,2.0单播默认发现机制。zen discovery

ingest等价于java版logstash节点

shrink api 索引收缩

查询数据:查询,过滤器

normalization 归一化

BM25和TF-IDF

es当作NOSQL,无分析查询会更好。也即精确查询。

基本查询:match,multi_match,common,fuzzy_like_this,fuzzy_like_this_field,geoshape,ids,match_all,query_string,simple_query_string range,prefix,regex,span_term,term,terms,wildcard

组合查询:bool,boosting,constant_score,dis_max,filtered,function_score,has_child,has_parent,indices,nested,span_first,span_multi, span_near,span_not,span_or,span_term,top_children

无分析查询:term,terms,common,ids,prefix,span_term,wildcard

全文检索:match,multi_match,query_string,simple_query_string

模式匹配:prefix,regexp,wildcard

相似度查询: fuzzy,fuzzy_like_this,fuzzy_like_this_filed,more_like_this,more_like_this_field

支持修改分数查询:boosting,constant_score,function_score,indices

位置敏感:match_pjrase,span_first,span_multi,span_near,span_not,span_or,span_term

结构敏感:nested,has_child,has_parent,top_children

查询改写:prefix,rewrite参数

es使用mustache实现查询模板

multi_match匹配类型:best_fields,most_fields,cross_fields,phrase,phrase_prefix

es管理关系型提供了:嵌套,父子文档结构

处理人类语言。

文档通过路由知道放在哪一个分片上面了。

集群主节点任务:节点间分配分片。

分片是一个lucence索引。

相似度模型(similarity):TF-IDF,BM25,DFR,DFI,IB,LM Dirichlet,LM JelinekMercer

store模块:lucene和io子系统的抽象:niofs,simplefs,mmapfs,fs

Searcher重新打开的过程:refresh,默认1s

flush:将事务日志同步到lucene index,同时清空事务日志。默认5s

实时读取:从事务日志中读取。

段合并耗费性能的操作。

并发合并调度器:ConcurrentMergeScheduler

NRT:近实时

发现模块:选主,发现新节点,形成和发现新节点过程叫发现。 Zen发现模块单播(unicast)发现集群中其他节点。

微服务发现是发现别人,但是自己发现不了。

es恢复过程:加载通过网关模块存储的数据以使得es正常工作。每当集群整体启动时候,恢复过程就会启动,加载所有数据:元数据,映射和全部索引。 网关模块存储es正常运行的全部数据。

es可以备份到s3,hdfs,gcs,azure

ingest处理器,内置23个等。append,convert,grok,fail等等。。。grok自带了120多种grok模式。

联盟搜索:跨集群搜索,通过部落节点

部落节点从所有连接的集群中获取集群状态,合并成一个集群状态。

percolation(过滤),suggester(联想,建议器)

查询验证 validate api

查询分析器:profile api

dfs query then fetch

发现集群变慢,阻塞等情况时候,可以看到热点线程api,看哪些线程耗费cpu,io等信息 hot_threads

当数据量够大,查询够复杂时候,就会碰到内存问题,此时增加更多内存也无济于事。

增加主分片:内存溢出,分片查询时间过长,内存交换,高IO等待等问题 增加副本分片:流量过高节点无法处理的时候,增加查询能力。

集群缺少部分节点数据,比完全不响应查询要好。

防止分片和副本部署在同一个节点,此时失去了高可用能力了。使用awareness能力。

集群部署:两个聚合节点,n个数据节点,三个候选主节点,冗余3个。设置minimum_master_nodes:2,可以避免脑裂发生。

索引刷新频率:文档需要多久才能出现在搜索结果里面。默认1s,意味着每1s索引查询器重新打开一次。

查询:总应该思考最优的查询结构,过滤器使用等。

不使用路由情况下,es会查询所有分片。如果知道文档在哪一个路由里面,将会提高效率。

索引只在一个分片上面,查询性能较差,增加副本分片对性能提高无用,需要将索引分为多个分片。把数据平均负载。

聚合查询:size:最后聚合结果返回多少组数据,shard_size:每个分片返回多少组数据。 降低size和shard_size会让聚合结果不那么准确,但是网络开销小,内存使用低。

过多的副本会导致索引速度下降。

es 预写日志WAL,tranlog,get请求获取最新的数据,确保数据持久化,优化lucene index的写入。

SSD优于HDD

基于时间的索引管理:shrink和rollover shrink api 减少主分片,生成新的索引。只有只读才能收缩。

Elasticsearch源码解析与优化实战

Gradle-TODO

Gradle简介

Book

Gradle实战

Druid-TODO

Druid简介

Spring

Spring Core

container:configuration model + dependency injection

Lightweight and minimally invasive development with POJOs Loose coupling through DI and interface orientation Declarative programming through aspects and common conventions Eliminating boilerplate code with aspects and templates

Spring一个包里面的support都是接口的便利实现。

核心对象

BeanWrapper,PropertiesEditor

BeanFactory,XmlBeanFactory,ListableBeanFactory,DefaultListableBeanFactory

BeanDefinition

ApplicationContext,WebApplicationContext,XmlWebApplicationContext

ContextLoader/ContextLoaderListener/ContextLoaderServlet

ApplicationContext:ClassPathXmlApplicationContext,FileSystemXmlApplicationContext,GenericGroovyApplicationContext 最灵活的方式:GenericApplicationContext

metadata -> bean definition -> bean factory -> applcation context

xxxTemplate(JDBC,JMS)

Resource,AbstractResource,ResourceLoader

BeanDefinitionRegistry,BeanDefinition,

异常处理:受检成运行,捕获能处理

Spring源码先看Interface,在看Interface继承关系,在看AbstractInterface

AbstractInterface里面的protected方法和属性需要注意。没有protected属性,说明不是为了继承而设计的。

spring aop的实现?

spring ioc的实现?

spring 事务的实现?

IOC

instantiating, configuring, and assembling the beans

configuration metadata:XML, Java annotations, or Java code

bean definition:Class,Name,Scope,Constructor arguments,Properties,Autowiring mode,Lazy initialization mode,Initialization method ,Destruction method

autowiring collaborators

FactoryBean 自定义Bean创建 ProxyFactoryBean 代理包装Bean TransactionProxyFactoryBean 事务包装代理

dependencies are on interfaces or abstract base classes, which allow for stub or mock implementations to be used in unit tests

uml里面有association和dependency. 而dependency和ioc dependency不一样,ioc dependency是所有有关联的对象,而不在乎对象来自哪里。 结合起来看,就是依赖注入(IOC)成关联(UML)。

public class A {

}

public class B {

}

public class C { //此时没有办法注入A,B public void doTask(A a,B b){} }

main() { C c = new C(); c.doTask(new A(), new B()); }

这个依赖放在了方法上面了,

AOP

Aspect:跨多个类的模块化横切点 Join point:程序执行的地点。 Advice:程序执行的地方发生的操作。 Pointcut:满足程序执行地点的条件。 Introduction:代表类型声明其他方法或字段 Target object: 被多个Aspect Advice的对象 AOP proxy: Weaving:

join points matched by pointcuts is the key to AOP

auto-proxying

aop is proxy-based frameworks

动态代理失效: AspectJ does not have this self-invocation issue because it is not a proxy-based AOP framework.

Pointcut以及实现.

创建代理方式: Dependency on Spring IoC:ProxyFactoryBean是Spring创建代理的Bean,ProxyFactoryBean通过 Dependency on Programmatically: ProxyFactory Dependency on Auto-proxy:BeanNameAutoProxyCreator,DefaultAdvisorAutoProxyCreator

java代理发展:静态,动态。spring aop框架(可单独使用) java依赖管理:手动,spring ioc容器(可单独使用)

TransactionProxyFactoryBean

Data Access

JDBC,DAO(二级抽象),ORM(JPA,Hibernate),Spring-Data-JDBC,spring-boot-starter-data-jdbc 一级(低级别抽象):一个控制JDBC工作流程和错误处理的框架。 org.springframework.jdbc.core 核心JdbcTemplate 二级(高级别抽象):RDBMS操作建模Java对象操作 - org.springframework.jdbc.object 核心RdbmsOperation ORM:org.springframework.orm

mybatis,spring-mybatis,spring-mybatis-starter

AOP:Advisor=Advice+PointCut

AOP

JavaBean+ProxyFactory=Proxy JavaBean

JavaBean+ProxyFactory+TransactionInterceptor=Proxy Transaction JavaBean

JavaBean+TransactionProxyFactoryBean=Proxy Transaction JavaBean

其他感悟

Spring从来不用别人接口,都是自己定义接口。

Spring源码先读接口和实现类,此垂直为类之职责。在读关联,此水平为类之交互。

Spring WebMVC

SpringWeb

SpringWebMVC

Spring Test

https://github.com/xiaozhiliaoo/spring-test-practice

Spring Boot

https://docs.spring.io/spring-boot/docs/current/reference/html/index.html

看到这里了:https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.testing.spring-boot-applications.spring-mvc-tests

Notes

Core

Convention_over_configuration

embed,starter,autoconfigure,production-ready features,no code generation

Project Structure

spring-boot-starter-xxx

spring-boot-xxx-autoconfigure

spring-boot-xxx

springboot的结构 spring-boot-starter spring-boot-autoconfigure spring-boot

springboot-test结构 spring-boot-starter-test(spring-boot-starter) spring-boot-test-autoconfigure spring-boot-test

springboot-actuator结构 spring-boot-starter-actuator(spring-boot-starter) spring-boot-actuator-autoconfigure spring-boot-actuator

Configuration Metadata

配置

Features

Liveness State Readiness StateL

Spring’s Environment abstraction

Database版本化管理:MyBatis Migrations,liquibase,flyway

https://metrics.ryantenney.com/ Spring integration for Dropwizard Metrics

Actuator(运维能力)

文档:https://docs.spring.io/spring-boot/docs/current/reference/html/actuator.html

数据通过http/jmx接口暴露

spring-boot-actuator

spring-boot-starter-actuator

spring-boot-actuator-autoconfigure

Health:HealthContributor,HealthIndicator

Information:InfoContributor

Endpoint:和应用检测和交互的点。提供了很多内置的Endpoint.

Actuator Web API:https://docs.spring.io/spring-boot/docs/2.6.4/actuator-api/htmlsingle/#overview

BeansEndpoint,BeansEndpointAutoConfiguration

Endpoint可以通过Http或者JMX暴露出去,这个是怎么实现的呢?

dropwizard,micrometer(Spring Boot 2: Migrating From Dropwizard Metrics to Micrometer)

Auto Configuration

spring-boot-autoconfigure

疑问:

starter里面的autoconfigure是怎么配置第三方库的呢?

mybatis-spring-boot-starter mybatis-spring-boot-autoconfigure mybatis-spring mybatis

spring的autoconfigure的配置类不在原来库代码里面,这是怎么做到的呢?而common-xxx配置类在库实现里面。 以上例子是经典例子, mybatis-spring配置了mybatis,从而在FactoryBean(Spring)获取SqlSessionFactory(MyBatis),是真实的创建了MyBatis的对象的, 但是mybatis-spring-boot-autoconfigure配置了mybatis,也创建对象。这就是自动装配。这一步是通过spring.factories文件实现的。

Spring的配置Bean概念,Configuration(配置的Bean),ConfigurationProperties(配置Bean的属性),autoconfigure指的是自动装配Bean.而在自动 装配过程中,用到了Properties

Spring Boot的设计是加载应用级配置,随后再考虑自动配置类。

单元测试专注于单一组件或组件中的一个方法,此处并不一定要使用Spring。Spring提供了 一些优势和技术——松耦合、依赖注入和接口驱动设计。这些都简化了单元测试的编写。但Spring 不用直接涉足单元测试,集成测试会涉及众多组件,这时就需要Spring帮忙了。

Spring Boot和Grails

HttpTrace默认的类,InMemoryHttpTraceRepository,100条,持久化实现需要继承HttpTraceRepository类

深入Spring Boot应用程序的内部细节:Actuator

ConfigurationProperties和Environment区别?

程序的配置,传入到

Spring Boot Admin

https://codecentric.github.io/spring-boot-admin/

注册方式:Admin Client或者Discovery Client

服务通过Admin Cli ent注册到Admin Server上了。

Service引入Admin Client把自己ip注册到Admin Service,然后Admin Service读取Admin Client的actuator接口?

Admin Client注册的类:ApplicationRegistrator,注册到Admin Server的InstancesController#register方法了。

Admin Server提供了三个Controller来接受请求,InstancesController,ApplicationsController,InstancesProxyController

Admin Server通过InstanceWebClient调用Service. 从而对Service进行控制。

核心实体类:Application,Instance,InstanceEvent(注册,不注册,状态更新,),Registration,InstanceEventStore

注册信息存在了哪里?InstanceEventStore,默认注册在内存中了。

基于事件的注册,将事件注册到内存中。

Spring Cloud Common

https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/

Core Class

SpringApplication SpringApplicationBuilder BootstrapConfiguration

DiscoveryClient ServiceRegistry LoadBalancerClient

Notes

Cloud核心包: Spring Cloud Context and Spring Cloud Commons and Spring Cloud Load Balancer and Circuit breaker

child contexts inherit property sources and profiles from their parent

common patterns: service discovery, load balancing, and circuit breakers

springcloud common loadbalance and spring cloud loadbalance 区别是什么?

common里面提供的接口,并没有提供实现,而spring cloud loadbalance提供了实现。 service discovery/register 实现:consul,zk,eureka load balancing 实现:spring cloud loadbalance circuit breaker实现:Resilience4J,Sentinel,Spring Retry(Spring Cloud Circuit Breaker)

一些新的endpoint,引入了spring-boot-actuator,refresh,restart,pause,env

Refresh入口类是:RefreshEndpoint,RefreshScope实现了GenericScope,GenericScope实现了Scope类, 也就是@RefreshScope是@Scope("refresh")

Spring Cloud Consul

https://cloud.spring.io/spring-cloud-consul/reference/html/

lookup service:

Spring Cloud Config

https://cloud.spring.io/spring-cloud-config/reference/html/

Core Class

Environment

EnvironmentRepository实现类JGitEnvironmentRepository ,通过Eclipse JGit实现

Notes

The Config Server runs best as a standalone application

other external property sources

Config Client:Config First Bootstrap,Discovery First Bootstrap

Spring Cloud Gateway

Spring Cloud OpenFeign

https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/

Spring Integration

Spring Others

Book

Spring实战第四版

SpringBoot解密

KVStore

Redis

The world’s most loved real‑time data platform

MapDB

分布式事务

atomikos

seata

Book

深入理解分布式事务

正本清源分布式事务之Seata

Library

apache-httpcomponents

https://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/fluent.html

Logback

apache-poi

Server-Sent-Events

https://en.wikipedia.org/wiki/Server-sent_events

https://www.youtube.com/watch?v=2To3_mYT2hc&t=8s

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-async-sse

dom4j

lombok

@Cleanup

logstash-logback-encoder

https://github.com/logfellow/logstash-logback-encoder

jmx

spring对jmx的处理:

step1:在spring-framework里面封装jmx,MBeanExporter,AnnotationMBeanExporter

step2:在spring-autoconfigure中自动配置jmx,JmxAutoConfiguration

step3:在spring-boot-actuator中通过JmxEndpointExporter将endpoint暴露出去,endpoint本身可以通过http和jmx暴露。 spring-boot-actuator-autoconfigure中配置JmxEndpointAutoConfiguration

step4:在spring-boot-admin中通过jolokia暴露所有jmx(endpoint和其他jmx)

https://docs.oracle.com/javase/tutorial/jmx/overview/index.html

https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jmx

https://docs.spring.io/spring-boot/docs/current/reference/html/actuator.html#actuator.jmx

https://codecentric.github.io/spring-boot-admin/2.5.1/#jmx-bean-management

单元测试

junit5

参数化测试:减少样板代码

API: ExecutionCondition

JUnit Jupiter’s org.junit.jupiter.api.Assertions class does not provide an assertThat() method like the one found in JUnit 4’s org.junit.Assert

Assumptions的作用是什么?

mode:per-class,per-method

@Nested:测试之间的关系以及测试结构

assertj

mockito

Mockito

Mockito

when...thenXXX(not void) ... invoke when...thenAnswer ... ... invoke

doXXX...when (void)... invoke doAnswer...when(void)... invoke

doAnswer使用场景是什么?测试有参数无返回值,无参数无返回值.

BDDMockito

BDDMockito

given...willReturn

verify

Argument matchers ArgumentCaptor

@Mock @Spy @Captor @InjectMocks

SpringBoot: @MockBean @SpyBean

@Mock和@MockBean区别是什么? @Mock和@InjectMocks区别是什么? doAnswer.when和when thenReturn和when thenAnswer和doReturn when区别区别? doAnswer.when返回空,就是void方法。

组合: @Mock和@InjectMock @MockBean和@Autowired

Use doAnswer() when you want to stub a void method with generic Answer.

jacoco

Book

有效的单元测试

Database

MySQL

Redis

资源地址

官方介绍文档

github地址

redis版本

版本的格式:major.minor.patch

Patch-Level versions

Patches primarily consist of bug fixes and very rarely introduce any compatibility issues.

Minor versions

Minor versions usually deliver maturity and extended functionality.

Major versions

Major versions introduce new capabilities and significant changes

版本的维护

Two additional versions receive maintenance only, meaning that only fixes for critical bugs and major security issues are committed and released as patches:

  • The previous minor version of the latest stable release.
  • The previous stable major release.

Mongdb

Canal

Doris

运维

Tracing

Monitoring

Logging

rancher

kubernetes

harbor

Reacher

aws

aliyun

Replication And Consistency(Not Consensus)

在分布式系统中,复制可用来实现Reliable, Scalable. 数据的复制会导致一致性问题。 数据的复制也即同步过程,会带来节点是否可用问题,这也是CAP中的AP权衡。 复制是一项基础技术,而一致性是这项技术带来的问题。可以认为没有复制便没有一致性问题。

Replication in database,kv,document-db,column-db,file system, distributed coordination,Framework

Replication Model https://en.wikipedia.org/wiki/Replication_(computing)

Consistency Model https://en.wikipedia.org/wiki/Consistency_model

Consistency Model https://jepsen.io/consistency

Database

mysql https://dev.mysql.com/doc/refman/8.0/en/replication.html

postgresql https://www.postgresql.org/docs/current/high-availability.html

mariadb https://mariadb.com/kb/en/standard-replication/

KV store

redis https://redis.io/topics/replication

etcd https://etcd.io/docs/v3.3/faq/

riak https://docs.riak.com/riak/kv/latest/learn/concepts/replication/index.html

tikv: https://tikv.org/deep-dive/scalability/introduction/

dynamo

consul https://www.consul.io/docs/architecture

Document

mongodb https://docs.mongodb.com/manual/replication/

couchdb https://docs.couchdb.org/en/stable/replication/intro.html https://guide.couchdb.org/editions/1/en/replication.html

Framework

hazelcast https://docs.hazelcast.com/imdg/4.2/consistency-and-replication/consistency

akka https://doc.akka.io/docs/akka/current/typed/cluster-concepts.html

Column DB

cassandra https://cassandra.apache.org/doc/latest/cassandra/architecture/overview.html

hbase https://hbase.apache.org/book.html#_cluster_replication

Message

kafka https://kafka.apache.org/documentation/#replication https://cwiki.apache.org/confluence/display/kafka/kafka+replication

rabbitmq https://www.rabbitmq.com/ha.html

rocketmq https://rocketmq.apache.org/docs/rmq-deployment/

activemq https://activemq.apache.org/clustering

elasticsearch https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html

solr https://solr.apache.org/guide/6_6/index-replication.html

File System

hdfs https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html#Data+Replication

ceph

GFS

Coordination

zookeeper: https://zookeeper.apache.org/doc/r3.2.2/zookeeperInternals.html

Distributed database(newsql)

yugabyte https://docs.yugabyte.com/latest/architecture/docdb-replication/replication/

CockroachDB https://www.cockroachlabs.com/docs/stable/architecture/replication-layer.html

other

Aurora https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.Replication.html

Spanner/F1

rethinkdb https://rethinkdb.com/docs/architecture/

Windows Azure Storage https://docs.microsoft.com/en-us/azure/storage/common/storage-redundancy

voldemort https://www.project-voldemort.com/voldemort/design.html

Bigtable

Yahoo PNUTS

VoltDB https://docs.voltdb.com/UsingVoltDB/ChapReplication.php

ScyllaDB https://docs.scylladb.com/architecture/

foundationdb: https://apple.github.io/foundationdb/consistency.html https://apple.github.io/foundationdb/fault-tolerance.html

ApplicationReplication ModelConsistency Model

Partitioning&Sharding

分区是将大数据集拆分成小数据集的方法,拆分会带来两个问题,1 是寻找到分区信息Routing 2 是增减节点时候Rebalance分区

https://en.wikipedia.org/wiki/Shard_(database_architecture)

https://en.wikipedia.org/wiki/Partition_(database)

partition:hazelcast,kafka shard:MongoDB,ES,Solr region:Hbase,TiKV tablet:Bigtable vnode:Cassandra,Riak vBucket(virtual buckets):Couchbase slot:Redis ShardRegion:akka

System

Memcached

Redis:https://redis.io/topics/partitioning https://redis.io/topics/cluster-tutorial

Cassandra https://docs.datastax.com/en/cassandra-oss/3.0/cassandra/architecture/archDataDistributeAbout.html https://cassandra.apache.org/doc/latest/cassandra/architecture/dynamo.html#dataset-partitioning-consistent-hashing

DynamoDB

Google Spanner

Shardingsphere

rethinkdb:https://rethinkdb.com/docs/architecture

HBase:https://hbase.apache.org/book.html#manual_region_splitting_decisions https://hbase.apache.org/book.html#regions.arch

BigTable:

MongoDB:https://docs.mongodb.com/manual/sharding/

voldemort: https://www.project-voldemort.com/voldemort/rebalance.html https://www.project-voldemort.com/voldemort/design.html

Couchbase: https://docs.couchdb.org/en/stable/partitioned-dbs/index.html

MySQL:https://dev.mysql.com/doc/refman/8.0/en/partitioning.html

Riak:

voltdb:https://docs.voltdb.com/UsingVoltDB/IntroHowVoltDBWorks.php

HDFS:https://blog.cloudera.com/partition-management-in-hadoop/

Ketama https://www.metabrew.com/article/libketama-consistent-hashing-algo-memcached-clients

Request Routing

service discovery

https://helix.apache.org/

https://docs.mongodb.com/manual/core/sharded-cluster-config-servers/

https://github.com/couchbase/moxi

Rebalancing(Split&Merge)

StorageEngine

MySQL:https://dev.mysql.com/doc/refman/8.0/en/innodb-storage-engine.html

Postgre:https://wiki.postgresql.org/wiki/Future_of_storage

Mariadb:https://mariadb.com/kb/en/choosing-the-right-storage-engine/

cassandra: https://cassandra.apache.org/doc/latest/cassandra/architecture/storage_engine.html

riak:https://docs.riak.com/riak/kv/2.2.3/setup/planning/backend/bitcask/index.html

consul,etcd:blot

leveldb:https://github.com/google/leveldb/blob/main/doc/impl.md

rocksdb:http://rocksdb.blogspot.com/

hbase:https://hbase.apache.org/book.html#inmemory_compaction

elasticsearch:lucene,https://www.elastic.co/cn/blog/found-dive-into-elasticsearch-storage

solr:lucene

lucene:https://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html

lmdb:http://www.lmdb.tech/doc/

voltdb:

memsql:

ramcloud:

redis:

mongodb:https://docs.mongodb.com/manual/core/storage-engines/

couchbase:

OLAP:Teradata,Vertica,SAP HANA,ParAccel,Amazon RedShift

Apache Hive,SparkSQL,Cloudear Impala,Facebook Presto,Aoache Tajo,Apache Drill,Google Gremel

Network Partition

https://en.wikipedia.org/wiki/Network_partition

https://en.wikipedia.org/wiki/Split-brain

akka:https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html

hazelcast: https://docs.hazelcast.com/imdg/4.2/network-partitioning/split-brain-protection https://hazelcast.com/blog/jepsen-analysis-hazelcast-3-8-3/

es: https://www.elastic.co/guide/en/elasticsearch/reference/8.0/modules-discovery-quorums.html https://www.elastic.co/guide/en/elasticsearch/reference/8.0/modules-discovery-voting.html https://www.elastic.co/guide/en/elasticsearch/reference/8.0/high-availability-cluster-design-large-clusters.html https://www.elastic.co/guide/en/elasticsearch/reference/8.0/high-availability-cluster-small-clusters.html

CouchDB: https://guide.couchdb.org/editions/1/en/conflicts.html

rabbitmq: https://www.rabbitmq.com/partitions.html

Transaction

https://en.wikipedia.org/wiki/Isolation_(database_systems)

aerospike:https://aerospike.com/blog/developers-understanding-aerospike-transactions/

datomic: https://docs.datomic.com/on-prem/transactions/transactions.html

MySQL:https://dev.mysql.com/doc/refman/8.0/en/innodb-locking-transaction-model.html

PostgreSQL:https://www.postgresql.org/docs/14/mvcc.html K Oracle

SQL Server2012: https://docs.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver15#database-engine-isolation-levels

FoundationDB:https://apple.github.io/foundationdb/transaction-manifesto.html

Access

MemSQL(singlestore):https://dbdb.io/db/singlestore

Index

https://en.wikipedia.org/wiki/Database_index

ResiliencyPatterns

Retry

https://github.com/elennick/retry4j

Guava Retry

Spring Retry

resilience4j-retry

https://failsafe.dev/

ServiceRegistry

https://microservices.io/patterns/service-registry.html

服务注册和发现是分布式系统里面的Naming部分,并不是Communication部分。Communication是Feign做的。

nacos,zookeeper,etcd,consul,eureka

nacos:一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。

zookeeper:Apache ZooKeeper is an effort to develop and maintain an open-source server which enables highly reliable distributed coordination.

etcd:A distributed, reliable key-value store for the most critical data of a distributed system

consul:Consul is a service networking solution to automate network configurations, discover services, and enable secure connectivity across any cloud or runtime.

eureka:Eureka is a RESTful (Representational State Transfer) service that is primarily used in the AWS cloud for the purpose of discovery, load balancing and failover of middle-tier servers. It plays a critical role in Netflix mid-tier infra.

Person

Phil Bernstein

Peter Bailis: HAT,Consistency Model

技术分享

Kafka-Architect

内容列表

kafka引入

AKF原则

kafka架构

代码演示

可靠性保证

kafka引入

kafka是什么?

官网介绍

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

confluent介绍

Apache Kafka is an event streaming platform used to collect, process, store, and integrate data at scale. It has numerous use cases including distributed streaming, stream processing, data integration, and pub/sub messaging.

重点:不只是分布式消息中间件,还是分布式流式消息(事件)处理平台

kafka的系统演进

单体到分布式,引入的问题:可靠性、可扩展性、高性能

AKF原则

AKF 立方体也叫做scala cube,它在《The Art of Scalability》一书中被首次提出,旨在提供一个系统化的扩展思路。AKF 把系统扩展分为以下三个维度:

  • X 轴:直接水平复制应用进程来扩展系统。(比如单体服务,但部署时一发动全身)
  • Y 轴:将功能拆分出来扩展系统。(拆分微服务,但好的业务迟早会遇见数据瓶颈)
  • Z 轴:基于用户信息扩展系统。(数据分区,比如分库分表)

分布式AKF原则

Kafka AKF分析

x轴,对parttion进行副本备份,副本(理论上可以读写分离,但容易出现一致性问题,干脆只能在主P上进行读写)

y轴:topic,不同的业务使用不同topic

z轴:partiton,对无关的数据打散到不同的分片,分而治之。将相关的数据按顺序聚合到同一个分片

Kafka数据处理思路

无关的数据,必然分治——>无关的数据就分散到不同的分区里,以追求并发并行

有关的数据,聚合——> 有关的数据,一定要按原有顺序发送到同一分区里

分区内部有序,分区外部无序

kafka架构

核心组件:ZookeeperBrokerTopicPartitionProducerConsumer(group)

Zookeeper

单机管理->主从集群 分布式协调,zk不用于存储(相当于服务网关,负责路由转发)

旧版本

producer是通过zookeeper获取集群节点信息的,zk除了是个协调以外,还是个存储DB,offset是维护在zookeeper当中

新老版本的区别

角色之间通信,producer和consumer面向的是broker,在业务层次上不再依赖zookeeper(减少zk的负载),只是个协调

元数据Metadata:topic,partition,broker

kafka架构图

整个架构

核心组件的关系

broker与zookeeper的关系?

broker与partition的关系?

controller是个啥?

创建一个topic的过程

kafka创建topic内部结构

Producer

在并发情况下,注意一致性(顺序性保证)的问题

lock() { 
		sql
		producer.produce()
} unlock();

生产的数据保存在哪里?

kafka的broker的partition里

Consumer

consumer与patition的关系:1:n/ 1:1

思考,consumer与patition的关系n:1可不可以?

破坏有序性

group

不同业务组之间,需要消费同一topic的数据,可以使用不同的group

在单一的使用场景下,先要保证,即便追求性能,用多个consumer,应该注意,不能一个分区由多个consumer消费

数据的重复利用是站在group上

offset

比如consumer重启,会不会导致数据重复消费和丢失,围绕的是消费的进度offset

起初consumer在runtime里维护自身的consumer,旧版本的offset是通过consumer与zookeeper通信维护的

新版kafka能自己维护offset,维护在topic当中,consumer->broker(runtime)->mem metadata->磁盘,持久层

offset的两种提交方式

  1. 自动提交:每间隔5s,先处理业务逻辑,异步提交offset,重复消费

  2. 手动提交:处理业务逻辑,同步提交offset

自动异步提交时(默认每5s提交一次),导致的问题?

  1. 重复消费&消息丢失
  2. 场景
    1. 还没到时间,挂了,没提交,重起一个consumer,参照offset的时候,会重复消费。
    2. 一个批次的数据还没写数据库成功,但是这个批次的offset被异步提交了,挂了,重起一个consumer,参照offset的时候,会导致消息丢失。

代码演示

实战

生产者消费者代码

可靠性保证

可靠性是系统的⼀个属性,⽽不是⼀个独⽴的组件,所以在讨论 Kafka 的可靠性保证时,还是要从系统的整体出发。

可靠性保证

只有当消息被写⼊分区的所有同步副本时(但不⼀定要写⼊磁盘),它才被认为是“已提交”的。

broker配置

broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。

复制系数

replication.factor,主题级别的配置参数,因为 Kafka 的默认复制系数就是 3——不过⽤户可以修改它。即使是在主题创建之后,也可以通过新增或移除副本来改变复制系数。

如果复制系数为 N,那么在 N-1 个 broker 失效的情况下,仍然能够从主题读取数据或向主题写⼊数 据。所以,更⾼的复制系数会带来更⾼的可⽤性、可靠性和更少的故障。另⼀⽅⾯,复制系数 N 需要 ⾄少 N 个 broker,⽽且会有 N 个数据副本,也就是说它们会占⽤ N 倍的磁盘空间。我们⼀般会在可⽤性和存储硬件之间作出权衡

副本的分布也很重要。默认情况下,Kafka 会确保分区的每个副本被放在不同的 broker 上。

不完全的⾸领选举

unclean.leader.election.enable只能在 broker 级别(实际上是在集群范围内)进⾏配置,它的默认值是 true。

我们之前提到过,当分区⾸领不可⽤时,⼀个同步副本会被选为新⾸领。如果在选举过程中没有丢失数据,也就是说提交的数据同时存在于所有的同步副本上,那么这个选举就是“完全”的。 但如果在⾸领不可⽤时其他副本都是不同步的,我们该怎么办呢?

简⽽⾔之,如果我们允许不同步的副本成为⾸领,那么就要承担丢失数据和出现数据不⼀致的⻛险。 如果不允许它们成为⾸领,那么就要接受较低的可⽤性,因为我们必须等待原先的⾸领恢复到可⽤状态。

如果把 unclean.leader.election.enable 设为 true,就是允许不同步的副本成为⾸领(也就 是“不完全的选举”),那么我们将⾯临丢失消息的⻛险。如果把这个参数设为 false,就要等待原先的⾸领重新上线,从⽽降低了可⽤性。

两种场景:银行系统,信用卡支付事务/实时点击流分析系统

最少同步副本

在主题级别和 broker 级别上,这个参数都叫 min.insync.replicas。

根据 Kafka 对可靠性保证的定义,消息只有在被写⼊到所有同步副本之后才被认为是已提交的。

对于 ⼀个包含 3 个副本的主题,如果 min.insync.replicas 被设为 2,那么⾄少要存在两个同步副本 才能向分区写⼊数据。 如果 3 个副本都是同步的,或者其中⼀个副本变为不可⽤,都不会有什么问题。不过,如果有两个副 本变为不可⽤,那么 broker 就会停⽌接受⽣产者的请求。尝试发送数据的⽣产者会收到 NotEnoughReplicasException 异常。消费者仍然可以继续读取已有的数据。实际上,如果使⽤ 这样的配置,那么当只剩下⼀个同步副本时,它就变成只读了,这是为了避免在发⽣不完全选举时数 据的写⼊和读取出现⾮预期的⾏为。为了从只读状态中恢复,必须让两个不可⽤分区中的⼀个重新变 为可⽤的(⽐如重启 broker),并等待它变为同步的。

在可靠的系统⾥使⽤⽣产者

  1. 根据可靠性需求配置恰当的 acks 值。
  2. 在参数配置和代码⾥正确处理错误。

acks(0,1,all)

⼀般情况下,如果你的⽬标是不丢失任何消息,那么最好让⽣产者在遇到可重试错误时能够保持重试。

重试发送⼀个已经失败的消息会带来⼀些⻛险,如果两个消息都写⼊成功,会导致消息重 复。重试和恰当的错误处理可以保证每个消息“⾄少被保存⼀次”,但当前 的 Kafka 版本(0.10.0)⽆法保证每个消息“只被保存⼀次”。

在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  2. 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  3. 精确一次(exactly once):消息不会丢失,也不会被重复发送。

使⽤⽣产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理⼤部分错误,不过对于开发⼈员来说,仍然需要处理其他类型的错误,包括:

  • 不可重试的 broker 错误,例如消息⼤⼩错误、认证错误等;
  • 在消息发送之前发⽣的错误,例如序列化错误;
  • 在⽣产者达到重试次数上限时或者在消息占⽤的内存达到上限时发⽣的错误
  1. 丢弃“不合法的消息”?
  2. 把错误记录下来?
  3. 把这些消息保存在本地 磁盘上?
  4. 回调另⼀个应⽤程序?

具体使⽤哪⼀种逻辑要根据具体的架构来决定。只要记住,如果错误处理只是为了重试发送消息,那么最好还是使⽤⽣产者内置的重试机制。

在可靠的系统⾥使⽤消费者

group.id:消费者组id

auto.offset.reset:(earliest/latest)

enable.auto.commit:(自动提交/手动提交)

auto.commit.interval.ms:自动提交的时间间隔

手动提交offset

错误重试,超过重试次数,就写入三方组件

仅⼀次传递:幂等性写⼊

总结

可靠性并不只是 Kafka 单⽅⾯的事情,我们应该从整个系统层⾯来考虑 可靠性问题,包括应⽤程序的架构、⽣产者和消费者 API 的使⽤⽅式、⽣产者和消费者的配置、主题 的配置以及 broker 的配置。

系统的可靠性需要在许多⽅⾯作出权衡,⽐如复杂性、性能、可⽤性和 磁盘空间的使⽤。掌握 Kafka 的各种配置和常⽤模式,对使⽤场景的需求做到⼼中有数,你就可以在 应⽤程序和 Kafka 的可靠性程度以及各种权衡之间作出更好的选择。

参考资料

kafka权威指南

官方文档

chatgpt写的一篇kafka权威指南总结

参考资料

Akka

优先看:https://academy.lightbend.com/

其次:classic: https://doc.akka.io/docs/akka/current/index-classic.html

typed:https://doc.akka.io/docs/akka/current/

API文档:https://doc.akka.io/api/ JAPI:https://doc.akka.io/japi/akka/2.6/akka/actor/AbstractActor.html

AKKA中文指南:https://www.bookstack.cn/read/guobinhit-akka-guide/711c3ba85a66f758.md

http://jasonqu.github.io/akka-doc-cn/2.3.6/scala/book/index.html

快速开始demo:https://developer.lightbend.com/start/?group=akka

代码示例:https://github.com/akka/akka-samples

https://github.com/typesafehub/activator-akka-java-spring https://github.com/lightbend

Akka 响应式架构 https://book.douban.com/subject/26829063/

Akka实战:Akka实战:快速构建高可用分布式应用 https://book.douban.com/subject/30218333/

Akka入门与实践 https://book.douban.com/subject/27055163/

Akka实战 https://book.douban.com/subject/30431012/

TTT:【技术分享】Akka入门与精品课教务系统分布式实战 http://training.corp.youdao.com/play.php?id=139

官方文档 https://doc.akka.io/docs/akka/current/typed/guide/index.html

akka生态:https://akka.io/docs/

https://developer.lightbend.com/docs/akka-platform-guide/index.html

使用akka开发微服务:https://developer.lightbend.com/docs/akka-platform-guide/microservices-tutorial/index.html

lightbend academy: https://academy.lightbend.com/courses

EBOOK: https://www.lightbend.com/akka-platform/resources

Consul

官方文档 https://www.consul.io/docs

生态:https://www.hashicorp.com/

Hazelcast

优先看:https://training.hazelcast.com/

官方文档IMDG https://hazelcast.org/imdg/docs/

https://docs.hazelcast.com/imdg/latest/getting-started.html

hazelcast文档:https://docs.hazelcast.com/hazelcast/5.1/

hazelcast mc文档:https://docs.hazelcast.com/management-center/latest/getting-started/overview

代码示例:https://github.com/hazelcast/hazelcast-code-samples

https://github.com/hazelcast/hazelcast/wiki

Quick Start:https://hazelcast.org/imdg/?samplelang=Java+Member&sampleindex=0

Mastering Hazelcast https://odoepner.files.wordpress.com/2015/04/mastering_hazelcast1.pdf

最新版本:https://hazelcast.com/resources/mastering-hazelcast/

官方文档PDF https://docs.hazelcast.org/docs/4.0.1/manual/pdf/index.pdf

Jackson

https://github.com/FasterXML/jackson-docs

https://github.com/FasterXML/jackson

https://github.com/FasterXML

ElasticSearch

优先看oreilly出版的《ElasticSearch权威指南》
中文版:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html(中文有些esquery写的不对,需要辅助英文的看) 英文版:https://www.elastic.co/guide/en/elasticsearch/guide/master/index.html

https://www.elastic.co/guide/index.html

ElasticSearch Elasticsearch源码解析与优化实战 https://book.douban.com/subject/30386800/

Elasticsearch实战 https://book.douban.com/subject/30380439/

深入理解ElasticSearch https://book.douban.com/subject/27066928/

官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html

极客时间 62.极客时间-Elasticsearch核心技术与实战

模型:https://dbdb.io/db/elasticsearch

Elasticsearch:权威指南 : https://www.elastic.co/guide/cn/elasticsearch/guide/2.x/index.html

Elasticsearch: The Definitive Guide: A distributed real-time search and analytics engine

IR:https://nlp.stanford.edu/IR-book/information-retrieval-book.html

Kafka

优先看:https://developer.confluent.io/ course:https://developer.confluent.io/learn-kafka/ 其次:https://kafka.apache.org/ 以及 https://cwiki.apache.org/confluence/display/KAFKA/Index

API:https://kafka.apache.org/30/javadoc/index.html

https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Kafka 深入理解Kakfa核心设计与实践原理 https://book.douban.com/subject/30437872/

Apache Kafka源码剖析 https://book.douban.com/subject/27038473/

Apache Kafka实战 https://book.douban.com/subject/30221096/

Kafka权威指南 https://book.douban.com/subject/27665114/

Kafka Streams实战 https://book.douban.com/subject/33425155/

官方文档 https://kafka.apache.org/documentation/#gettingStarted

https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/

Canal

Canal 官方文档 https://github.com/alibaba/canal

Gradle

Gradle Gradle实战 https://book.douban.com/subject/26609447/ 官方文档 https://docs.gradle.org/current/userguide/userguide.html

MapDB

官方文档:https://mapdb.org/

https://dbdb.io/db/mapdb

mapdb作者youtube:https://www.youtube.com/user/jankotek

Seata

深入理解分布式事务 冰河

正本清源分布式事务之Seata

Github Repository

https://github.com/xiaozhiliaoo/akka/tree/akka-practice

https://github.com/xiaozhiliaoo/elasticsearch/tree/es-practice

https://github.com/xiaozhiliaoo/kafka/tree/kafka-practice

https://github.com/xiaozhiliaoo/hazelcast/tree/hazelcast-practice

参考资料

Akka

优先看:https://academy.lightbend.com/

其次:classic: https://doc.akka.io/docs/akka/current/index-classic.html

typed:https://doc.akka.io/docs/akka/current/

API文档:https://doc.akka.io/api/ JAPI:https://doc.akka.io/japi/akka/2.6/akka/actor/AbstractActor.html

AKKA中文指南:https://www.bookstack.cn/read/guobinhit-akka-guide/711c3ba85a66f758.md

http://jasonqu.github.io/akka-doc-cn/2.3.6/scala/book/index.html

快速开始demo:https://developer.lightbend.com/start/?group=akka

代码示例:https://github.com/akka/akka-samples

https://github.com/typesafehub/activator-akka-java-spring https://github.com/lightbend

Akka 响应式架构 https://book.douban.com/subject/26829063/

Akka实战:Akka实战:快速构建高可用分布式应用 https://book.douban.com/subject/30218333/

Akka入门与实践 https://book.douban.com/subject/27055163/

Akka实战 https://book.douban.com/subject/30431012/

TTT:【技术分享】Akka入门与精品课教务系统分布式实战 http://training.corp.youdao.com/play.php?id=139

官方文档 https://doc.akka.io/docs/akka/current/typed/guide/index.html

akka生态:https://akka.io/docs/

https://developer.lightbend.com/docs/akka-platform-guide/index.html

使用akka开发微服务:https://developer.lightbend.com/docs/akka-platform-guide/microservices-tutorial/index.html

lightbend academy: https://academy.lightbend.com/courses

EBOOK: https://www.lightbend.com/akka-platform/resources

Consul

官方文档 https://www.consul.io/docs

生态:https://www.hashicorp.com/

Hazelcast

优先看:https://training.hazelcast.com/

官方文档IMDG https://hazelcast.org/imdg/docs/

https://docs.hazelcast.com/imdg/latest/getting-started.html

hazelcast文档:https://docs.hazelcast.com/hazelcast/5.1/

hazelcast mc文档:https://docs.hazelcast.com/management-center/latest/getting-started/overview

代码示例:https://github.com/hazelcast/hazelcast-code-samples

https://github.com/hazelcast/hazelcast/wiki

Quick Start:https://hazelcast.org/imdg/?samplelang=Java+Member&sampleindex=0

Mastering Hazelcast https://odoepner.files.wordpress.com/2015/04/mastering_hazelcast1.pdf

最新版本:https://hazelcast.com/resources/mastering-hazelcast/

官方文档PDF https://docs.hazelcast.org/docs/4.0.1/manual/pdf/index.pdf

Jackson

https://github.com/FasterXML/jackson-docs

https://github.com/FasterXML/jackson

https://github.com/FasterXML

ElasticSearch

优先看oreilly出版的《ElasticSearch权威指南》
中文版:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html(中文有些esquery写的不对,需要辅助英文的看) 英文版:https://www.elastic.co/guide/en/elasticsearch/guide/master/index.html

https://www.elastic.co/guide/index.html

ElasticSearch Elasticsearch源码解析与优化实战 https://book.douban.com/subject/30386800/

Elasticsearch实战 https://book.douban.com/subject/30380439/

深入理解ElasticSearch https://book.douban.com/subject/27066928/

官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html

极客时间 62.极客时间-Elasticsearch核心技术与实战

模型:https://dbdb.io/db/elasticsearch

Elasticsearch:权威指南 : https://www.elastic.co/guide/cn/elasticsearch/guide/2.x/index.html

Elasticsearch: The Definitive Guide: A distributed real-time search and analytics engine

IR:https://nlp.stanford.edu/IR-book/information-retrieval-book.html

Kafka

优先看:https://developer.confluent.io/ course:https://developer.confluent.io/learn-kafka/ 其次:https://kafka.apache.org/ 以及 https://cwiki.apache.org/confluence/display/KAFKA/Index

API:https://kafka.apache.org/30/javadoc/index.html

https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Kafka 深入理解Kakfa核心设计与实践原理 https://book.douban.com/subject/30437872/

Apache Kafka源码剖析 https://book.douban.com/subject/27038473/

Apache Kafka实战 https://book.douban.com/subject/30221096/

Kafka权威指南 https://book.douban.com/subject/27665114/

Kafka Streams实战 https://book.douban.com/subject/33425155/

官方文档 https://kafka.apache.org/documentation/#gettingStarted

https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/

Canal

Canal 官方文档 https://github.com/alibaba/canal

Gradle

Gradle Gradle实战 https://book.douban.com/subject/26609447/ 官方文档 https://docs.gradle.org/current/userguide/userguide.html

MapDB

官方文档:https://mapdb.org/

https://dbdb.io/db/mapdb

mapdb作者youtube:https://www.youtube.com/user/jankotek

Seata

深入理解分布式事务 冰河

正本清源分布式事务之Seata

Github Repository

https://github.com/xiaozhiliaoo/akka/tree/akka-practice

https://github.com/xiaozhiliaoo/elasticsearch/tree/es-practice

https://github.com/xiaozhiliaoo/kafka/tree/kafka-practice

https://github.com/xiaozhiliaoo/hazelcast/tree/hazelcast-practice