0%

一.*kafka manager简介*

1>.什么是Kafka Manager

1
2
3
4
5
KafkaManager是一个用于管理Kafka的web工具,由yahoo开发并开源。

雅虎官网GitHub项目:https://github.com/yahoo/kafka-manager。需要安装jdk1.8版本。

我们可以参考官方的文档进行编译和简单配置即可使用。

img

2>.kafka manager支持功能

![复制代码](https://common.cnblogs.com/images/copycode.gif

1
2
3
4
5
6
7
8
9
10
11
  其提供了如下管理和监控能力:
    管理多集群
    查看集群状态(topics,consumers,offsets,brokers等)
    执行倾向副本选举
    生成分区重分布计划(自动和手动)
    执行分区重分布
    多topic批量生成分区重分布计划,执行分区重分布
    使用指定参数创建topic
    删除topic(需要集群参数开启)
    现有topic增加分区
    通过JMX获取broker层面和topic层面指标

img

二.部署kafka manager

**1>.**下载kafka-manager

1
2
3
4
[root@node108.yinzhengjie.org.cn ~]# ll
total 67092
-rw-r--r-- 1 root root 68699247 Jul 13 22:08 kafka-manager-1.3.0.7.zip        #这里提供编译好了的包,下载后可以直接使用,可以不用去sbt编译。
[root@node108.yinzhengjie.org.cn ~]#

2>.解压kafka-manager

img [root@node108.yinzhengjie.org.cn ~]# yum -y install unzip zip

img [root@node108.yinzhengjie.org.cn ~]# unzip kafka-manager-1.3.0.7.zip

3>.将解压后的文件放入到你存放软件目录的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@node108.yinzhengjie.org.cn ~]# ll
total 67092
drwxr-xr-x 6 root root 70 Jul 13 22:11 kafka-manager-1.3.0.7
-rw-r--r-- 1 root root 68699247 Jul 13 22:08 kafka-manager-1.3.0.7.zip
[root@node108.yinzhengjie.org.cn ~]#
[root@node108.yinzhengjie.org.cn ~]# mv kafka-manager-1.3.0.7 /home/softwares/kafka-manager
[root@node108.yinzhengjie.org.cn ~]#
[root@node108.yinzhengjie.org.cn ~]# ll /home/softwares/kafka-manager/
total 20
drwxr-xr-x 2 root root 52 Jul 13 22:11 bin
drwxr-xr-x 2 root root 81 Jul 13 22:11 conf
drwxr-xr-x 2 root root 8192 Jul 13 22:11 lib
-rw-r--r-- 1 root root 6323 Apr 5 2016 README.md
drwxr-xr-x 3 root root 17 Jul 13 22:11 share
[root@node108.yinzhengjie.org.cn ~]#
[root@node108.yinzhengjie.org.cn ~]# ll
total 67092
-rw-r--r-- 1 root root 68699247 Jul 13 22:08 kafka-manager-1.3.0.7.zip
[root@node108.yinzhengjie.org.cn ~]#
[root@node108.yinzhengjie.org.cn ~]#

4>.修改kafka的配置文件

1
2
3
4
5
6
7
[root@node108.yinzhengjie.org.cn ~]# 
[root@node108.yinzhengjie.org.cn ~]# grep kafka-manager.zkhosts /home/softwares/kafka-manager/conf/application.conf | head -1
kafka-manager.zkhosts="node106.yinzhengjie.org.cn:2181,node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181/kafka01"
[root@node108.yinzhengjie.org.cn ~]#
[root@node108.yinzhengjie.org.cn ~]# grep port /home/softwares/kafka-manager/conf/application.conf | head -1
http.port=8888
[root@node108.yinzhengjie.org.cn ~]#

5>.每个kafka broker都需要开启JMX(想要看到kafka集群的读取河写入速度就必须开启JMX,),需要编辑kafka启动脚本

1
2
3
4
5
6
7
8
[root@node106.yinzhengjie.org.cn ~]# vi /home/softwares/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh 
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# scp /home/softwares/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh node107.yinzhengjie.org.cn:/home/softwares/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh
kafka-server-start.sh 100% 1627 1.3MB/s 00:00
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# scp /home/softwares/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh node108.yinzhengjie.org.cn:/home/softwares/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh
kafka-server-start.sh 100% 1627 2.0MB/s 00:00
[root@node106.yinzhengjie.org.cn ~]#

6>.启动kafka-manager(kafka和zookeeper也需要启动起来哟~尤其是zookeeper必须是得运行的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
[root@node108.yinzhengjie.org.cn ~]# 
[root@node108.yinzhengjie.org.cn ~]# nohup /home/softwares/kafka-manager/bin/kafka-manager -java-home /home/softwares/jdk1.8.0_201 -Dconfig.file=/home/softwares/kafka-manager/conf/application.conf >> /home/softwares/kafka-manager/kafka_manager.log 2>&1 &
[1] 3753
[root@node108.yinzhengjie.org.cn ~]#
[root@node108.yinzhengjie.org.cn ~]# jps
3826 Jps
3753 ProdServerStart
[root@node108.yinzhengjie.org.cn ~]#

[root@node108.yinzhengjie.org.cn ~]# tail -100f /home/softwares/kafka-manager/kafka_manager.log

.....

[info] o.a.z.ZooKeeper - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
[info] o.a.z.ZooKeeper - Client environment:java.io.tmpdir=/tmp
[info] o.a.z.ZooKeeper - Client environment:java.compiler=<NA>
[info] o.a.z.ZooKeeper - Client environment:os.name=Linux
[info] o.a.z.ZooKeeper - Client environment:os.arch=amd64
[info] o.a.z.ZooKeeper - Client environment:os.version=3.10.0-957.el7.x86_64
[info] o.a.z.ZooKeeper - Client environment:user.name=root
[info] o.a.z.ZooKeeper - Client environment:user.home=/root
[info] o.a.z.ZooKeeper - Client environment:user.dir=/home/softwares/kafka-manager
[info] o.a.z.ZooKeeper - Initiating client connection, connectString=node106.yinzhengjie.org.cn:2181,node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181/kafka01 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@65e0186a
[info] o.a.z.ClientCnxn - Opening socket connection to server node106.yinzhengjie.org.cn/172.30.1.106:2181. Will not attempt to authenticate using SASL (unknown error)
[info] o.a.z.ClientCnxn - Socket connection established to node106.yinzhengjie.org.cn/172.30.1.106:2181, initiating session
[info] o.a.z.ClientCnxn - Session establishment complete on server node106.yinzhengjie.org.cn/172.30.1.106:2181, sessionid = 0x6a00001622cf0000, negotiated timeout = 40000
[info] k.m.a.KafkaManagerActor - zk=node106.yinzhengjie.org.cn:2181,node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181/kafka01
[info] k.m.a.KafkaManagerActor - baseZkPath=/kafka-manager
[info] k.m.a.KafkaManagerActor - Started actor akka://kafka-manager-system/user/kafka-manager
[info] k.m.a.KafkaManagerActor - Starting delete clusters path cache...
[info] k.m.a.DeleteClusterActor - Started actor akka://kafka-manager-system/user/kafka-manager/delete-cluster
[info] k.m.a.DeleteClusterActor - Starting delete clusters path cache...
[info] k.m.a.KafkaManagerActor - Starting kafka manager path cache...
[info] k.m.a.DeleteClusterActor - Adding kafka manager path cache listener...
[info] k.m.a.DeleteClusterActor - Scheduling updater for 10 seconds
[info] k.m.a.KafkaManagerActor - Adding kafka manager path cache listener...
[info] play.api.Play - Application started (Prod)
[info] p.c.s.NettyServer - Listening for HTTP on /0.0.0.0:8888      #说明我们的kafka_manager服务端口已经监听啦,我们可以直接去访问啦
[info] k.m.a.KafkaManagerActor - Updating internal state...
[info] k.m.a.KafkaManagerActor - Updating internal state...
[info] k.m.a.KafkaManagerActor - Updating internal state...
^C
[root@node108.yinzhengjie.org.cn ~]#

7>.登陆webUI查看相应的信息,如果出现以下界面说明你部署成功啦!

img

**三.**WEB端配置kafka-manager

1>.点击添加集群

img

2>.配置集群(集群名称不支持中文,只支持ASSCI编码)

img

3>.其他参数保持默认,点击保存即可

img

4>.配置成功

img

5>.点击”yinzhengjie-kafka”

img

6>.查看自定义的集群信息

img

7>.*查看broker信息*

img

8>.*查看topic信息*

*img*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
以上输出信息关键参数说如下:
Operations列表参数说明:
Generate Partition Assignments:
  对多个topic来进行分区重分布计划的生成,知道就好,不推荐使用。

Run Partition Assignments:
  对多个topic分区执行重分布计划,知道就好,不推荐使用。

Add Partitions:
  对多个topic来进行添加分区操作,生成环境并不建议大家这样搞,避免误操作。

Topics列表参数说明:
Topic:
表示topic名称。
__consumer_offsets是kafka内置保存consumer offset的topic.

Partitions:
topic对应的分区数

Brokers:
topic对应的broker数

Brokers Spread%:
指broker的分布比例,计算公式:用topic的分区分布的broker数/总的broker数量。

Brokers Skew %:
Brokers的偏斜比例,计算公式:(Broker最多的分区数/Brokers最少的分区数) -1

Brokers Leader Skew%:
Brokers的leader偏斜比例,计算公式:(Broker最多的leader分区数/Brokers最少leader的分区数) -1

Replicas:
副本因子,即副本数。

Under Replicated %:
副本不足的比例,计算公式:副本不足的分区数/总分区数

Producer Message/Sec:
每秒产生的消息数量。

Summed Recent Offsets:
当前总计的消费偏移量。

*9>.创建topic*

img

*10>.topic创建完成*

*img*

*11>.topic操作*

img

*12>.点击”Reassign Partitions”,查看分区重分布的进度*

img

*13>.点击”Preferred Replica Election”可以执行倾向副本选举,一般我们在配置文件配置好了就不需要在这里手动触发啦!*

img

*14>.查看消费者组信息*

img

1
2
3
4
5
6
7
8
9
10
11
12
Consumer:
表示的是消费者组。

Type:
ZK表示offset信息保存在zookeeper中。
KF表示offset信息保存在kafka中。

Topic it consumes from:
已经消费的分区的coverage:
消费的覆盖率。
已经消费的lag:
消费的延迟大小。有可能会看到负值,

*15>.查看已经消费者的详细信息*

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Partition    
分区编号。

LogSize
分区大小

Consumer Offset
消费的偏移量。

Lag
延迟大小,改值可能是负数。

Consumer Instance Owner
消费者实例拥有者。

使用redis也有端时间了,现在讲开发中遇到的几个常见异常总结如下:

一、通过JedisPool类实例获取getResource()时抛出can’t get a resource异常。

异常代码如下:

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

at redis.clients.util.Pool.getResource(Pool.java:22)

分析:

redis.clients.util.Pool.getResource会从JedisPool实例池中返回一个可用的redis连接。分析源码可知JedisPool extends redis.clients.util.Pool .而Pool是通过

commons-pool开源工具包中的org.apache.commons.pool2.impl.GenericObjectPool来实现对Jedis实例的管理的。所以我们分析一下GenericObjectPool或许能找到答案。

其中三个重要个几个属性是:

MaxActive: 最大连接数。

MaxIdle: 最大空闲数。

MaxWait: 最大等待时间,单位毫秒(million seconds)。

当连接池中无可用连接时会会进行等待maxWait时间,若超出泽抛Could not get a resource from the pool异常。

所以应根据程序实际情况合理设置这三个参数的值,尽量避免这个异常。

二、对redis进行操作时,抛出redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out异常。

异常代码如下:

redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out

at redis.clients.jedis.Protocol.process(Protocol.java:79)

at redis.clients.jedis.Protocol.read(Protocol.java:131)

at redis.clients.jedis.Connection.getIntegerReply(Connection.java:188)

at redis.clients.jedis.Jedis.sismember(Jedis.java:1266)

分析:

Redis是对内存进行操作,速度应该都在毫秒级,这是我们通常的认识,

那么Redis操作怎么会出现几秒的超时时间?

我们还是先分析一下Jedis的源代码吧,以sadd操作为例:

public Long sadd(final String key, final String… members) {

checkIsInMulti();

client.sadd(key, members);

return client.getIntegerReply();

}

client是redis.clients.jedis.Client.java的实例,继承关系如下:

public class Client extends BinaryClient implements Commands;

public class BinaryClient extends Connection;

Connection包装了对Redis server的socket操作,命令写操作通过socket.getOutputStream()输出流将命令信息发送到redis server,当写完命令后要通过socket.getInputStream()的到的输入流将

命令执行结果返回,这中间必然会有一个命令执行到结果返回的延时时间,这就是一个Jedis调用redis命令操作所用的时间。

需要说明的是,Redis server是单线程执行所有连接发送过来的命令的,也就是说不管并发中有多少个client在发送命令,redis-server端是单线程处理的,并按照默认的FIFO方式处理请求,

这个可在redis.conf配置文件中配置。关于redis server的详细运行机制参见:http://redis.io/documentation

所以client.sadd(key, members);调用完后只是将命令信息发送到了redis server端,具体有没有执行要看redis server的负载情况。然后,通过client.getIntegerReply();等待(time out)返回结果。

Connection初始化socket时有多种选择,其中设置socket time out 的方法如下:

public void rollbackTimeout() {

​ try {

​ socket.setSoTimeout(timeout);

​ socket.setKeepAlive(false);

​ } catch (SocketException ex) {

​ throw new JedisException(ex);

​ }

​ }

由redis.clients.jedis.Protocol.DEFAULT_TIMEOUT = 2000 我们知道默认的超时时间是2秒,这个时间相对于redis操作内存毫秒级的速度来说已经很长,那我们为什么还会遇到

ava.net.SocketTimeoutException: Read timed out异常呢?redis操作内存虽然平均毫秒级的,但当数据量很大时未必都如此快速。在我的开发过程中就遇到过一个集合到了

千万级数据量,一次操作超时时间在秒级是很正常的,而且机器性能很好的情况下已经如此。

所以在初始化JedisPool时应该根据实际

情况通过redis.clients.jedis.JedisPoolConfig合理设置连接池参数,通过edisPool构造方法,合理设置socket读取输入InputStream的超时时间。

pool = new JedisPool(config, host, port, 100000);

注意第四个参数time out,设置成我们能容忍的超时时间,单位是毫秒。

设置第四个参数后,问题基本解决。

idea循环依赖aAnnotation processing is not supported for module cycles.

  1. 错误现象
    Error:java: Annotation processing is not supported for module cycles. Please ensure that all modules from cycle [book-rpc,book-api] are excluded from annotation processing

通过Analyze->Analyze Module Dependencies…发现是循环依赖。

2.解决办法
先是找到这个依赖,删掉相关的(右击第一个module选中open module setting–>Dependencies)

然后在book-api中的pom.xml找到第二个这个依赖,删掉,重新import Changes need to be imported

如果仅仅删除module中的Dependencies中的相关依赖,有可能没有解决根本性的问题,所以要看一下pom.xml中是否已经存在了相关依赖

前言

理解一下Kafka的读的自动提交功能。

找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记。

正文

Understanding the ‘enable.auto.commit’ Kafka Consumer property

img

Kafka Consumers read messages from a Kafka topic, its not a hard concept to get your head around. But behind the scenes there’s a lot more going on than meets the eye.

Say we’re consuming messages from a Topic and our Consumer crashes. Once we realise that the world isn’t ending, we recover from the crash and we start consuming again. We start receiving messages exactly where we left off from, its kinda neat.

假设我们正在从一个 Topic 中消费消息,这个时候我们的这个消费者(客户端)宕机了。我们意识到这不是世界的末日,我们可以从宕机中恢复,重新开始消费。我们可以从我们上一次离开的地方重新接收消息,这非常灵巧。

There’s two reasons as to why this happens. One is something referred to as the “Offset” and the other is a couple of default Consumer values.

发生这样的事情是因为两个原因。一个是一个叫 “Offset” 的东西,另外一个是一些 Consumer 的默认的值。

So whats an Offset?

The Offset is a piece of metadata, an integer value that continually increases for each message that is received in a partition. Each message will have a unique Offset value in a partition.

Offset 是一块元数据,一个整数,会针对每一个 partition 上接收到的消息而持续增长。每一个消息在一个 partition 上将会有唯一的一个Offset。

img

I use Keys in some of my projects, some of them I don’t ;)

So as you can see here, each message has a unique Offset, and that Offset represents the position of that message in that particular partition.

上面介绍了一下Kafka的offset是什么,offset是记录每条消息在partition里面的位置的。

When a Consumer reads the messages from the Partition it lets Kafka know the Offset of the last consumed message. This Offset is stored in a Topic named _consumer_offsets, in doing this a consumer can stop and restart without forgetting which messages it has consumed.

这里讲,offset会被存在一个叫做**_consumer_offsets**的主题中,这样来帮助消费者记录处理到哪里了。

When we create our Consumers, they have a set of default properties which we can override or we can just leave the default values in effect.

There are two properties that are driving this behaviour.

有两个属性需要关注。

1
2
3
4
5
enable.auto.commit



auto.commit.interval.ms

The first property enable.auto.commit has a default value of true and the second property auto.commit.interval.ms has a default value of 5000. These values are correct for Blizzards node-rdkafka client and the Java KafkaConsumer client but other libraries may differ.

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。

auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

So by default every 5 seconds a Consumer is going to commit its Offset to Kafka or every time data is fetched from the specified Topic it will commit the latest Offset.

这样,默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset。

Now in some scenarios this is the ideal behaviour but on other scenarios its not.

这样,在某些场景下,这是理想的表现,但是在其他场景下,并不是。

Say our Consumer is processing a message with an Offset of 100 and whilst processing it the Consumer fetches some more data, the Offset is commit and then the Consumer crashes. Upon coming back up it will start consuming messages from the most recent committed Offset, but how can we safely say that we haven’t lost messages and the Offset of the new message isn’t later then the one of the message been processed?

这么说,我们的 Consumer 正在消费一个 Offset 是100的消息,同时这个 Consumer 取回了一些数据,这个 Offset 提交了,然后 Consumer 崩溃了。在我们回来的时候,我们会重新从最新提交的 Offset 去进行消息的消费,但是我们如何能安全地说,我们没有丢失消息,并且这个新消息的 Offset 不会比刚刚被处理的那个消息靠后呢?

What we can do is commit the Offset of messages manually after processing them. This give us full control over when we consider a message dealt with, processed and ready to let Kafka know that.

解决这个问题的方案就是我们手动地提交这个 Offset,在处理完这些消息之后。这给与了我们完全的控制,什么时候去处理一个消息,什么时候去让 Kafka 知道这个。

Firstly we have to change the value of the enable.auto.commit property.

1
enable.auto.commit: false

When we change this property the auto.commit.interval.ms value isnt taken into consideration.

So now we can commit our Offset manually after the processing has taken place and if the Consumer crashes whilst processing a message it will start consuming from that same Offset, no messages lost.

我们把这个参数设置为 false ,就会由我们自己手动地来处理这个事情。

Both the clients mentioned earlier in this article have methods exposed to commit the Offset.

For further reading on the clients check out the links below.

如果 enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑了

JSDoc: Class: KafkaConsumer
KafkaConsumer class for reading messages from Kafka This is the main entry point for reading data from Kafka. You…blizzard.github.io

KafkaConsumer (kafka 0.10.2.1 API)
To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the…kafka.apache.org

If anyone wants any more information on Kafka or Consumers get in touch on Twitter.

Cheers,

Danny

https://twitter.com/danieljameskay

参考

https://medium.com/@danieljameskay/understanding-the-enable-auto-commit-kafka-consumer-property-12fa0ade7b65

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html 这里是官网介绍如何使用consumer

注意:配置基于Kafka 0.8.2.1

broker配置

#非负整数,用于唯一标识broker
broker.id 0

#kafka持久化数据存储的路径,可以指定多个,以逗号分隔
log.dirs /tmp/kafka-logs

#broker接收连接请求的端口
port 9092

#指定zk连接字符串,[hostname:port]以逗号分隔
zookeeper.connect

#单条消息最大大小控制,消费端的最大拉取大小需要略大于该值
message.max.bytes 1000000

#接收网络请求的线程数
num.network.threads 3

#用于执行请求的I/O线程数
num.io.threads 8

#用于各种后台处理任务(如文件删除)的线程数
background.threads 10

#待处理请求最大可缓冲的队列大小
queued.max.requests 500

#配置该机器的IP地址
host.name

#默认分区个数
num.partitions 1

#分段文件大小,超过后会轮转
log.segment.bytes 1024 * 1024 * 1024

#日志没达到大小,如果达到这个时间也会轮转
log.roll.{ms,hours} 168

#日志保留时间
log.retention.{ms,minutes,hours}

#不存在topic的时候是否自动创建
auto.create.topics.enable true

#partition默认的备份因子
default.replication.factor 1

#如果这个时间内follower没有发起fetch请求,被认为dead,从ISR移除
replica.lag.time.max.ms 10000

#如果follower相比leader落后这么多以上消息条数,会被从ISR移除
replica.lag.max.messages 4000

#从leader可以拉取的消息最大大小
replica.fetch.max.bytes 1024 * 1024

#从leader拉取消息的fetch线程数
num.replica.fetchers 1

#zk会话超时时间
zookeeper.session.timeout.ms 6000

#zk连接所用时间
zookeeper.connection.timeout.ms

#zk follower落后leader的时间
zookeeper.sync.time.ms 2000

#是否开启topic可以被删除的方式
delete.topic.enable false

producer配置

#参与消息确认的broker数量控制,0代表不需要任何确认 1代表需要leader replica确认 -1代表需要ISR中所有进行确认
request.required.acks 0

#从发送请求到收到ACK确认等待的最长时间(超时时间)
request.timeout.ms 10000

#设置消息发送模式,默认是同步方式, async异步模式下允许消息累计到一定量或一段时间又另外线程批量发送,吞吐量好但丢失数据风险增大
producer.type sync

#消息序列化类实现方式,默认是byte[]数组形式
serializer.class kafka.serializer.DefaultEncoder

#kafka消息分区策略实现方式,默认是对key进行hash
partitioner.class kafka.producer.DefaultPartitioner

#对发送的消息采取的压缩编码方式,有none|gzip|snappy
compression.codec none

#指定哪些topic的message需要压缩
compressed.topics null

#消息发送失败的情况下,重试发送的次数 存在消息发送是成功的,只是由于网络导致ACK没收到的重试,会出现消息被重复发送的情况
message.send.max.retries 3

#在开始重新发起metadata更新操作需要等待的时间
retry.backoff.ms 100

#metadata刷新间隔时间,如果负值则失败的时候才会刷新,如果0则每次发送后都刷新,正值则是一种周期行为
topic.metadata.refresh.interval.ms 600 * 1000

#异步发送模式下,缓存数据的最长时间,之后便会被发送到broker
queue.buffering.max.ms 5000

#producer端异步模式下最多缓存的消息条数
queue.buffering.max.messages 10000

#0代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃
queue.enqueue.timeout.ms -1

#一次批量发送需要达到的消息条数,当然如果queue.buffering.max.ms达到的时候也会被发送
batch.num.messages 200

consumer配置

#指明当前消费进程所属的消费组,一个partition只能被同一个消费组的一个消费者消费
group.id

#针对一个partition的fetch request所能拉取的最大消息字节数,必须大于等于Kafka运行的最大消息
fetch.message.max.bytes 1024 * 1024

#是否自动周期性提交已经拉取到消费端的消息offset
auto.commit.enable true

#自动提交offset到zookeeper的时间间隔
auto.commit.interval.ms 60 * 1000

#消费均衡的重试次数
rebalance.max.retries 4

#消费均衡两次重试之间的时间间隔
rebalance.backoff.ms 2000

#当重新去获取partition的leader前需要等待的时间
refresh.leader.backoff.ms 200

#如果zookeeper上没有offset合理的初始值情况下获取第一条消息开始的策略smallest|largeset
auto.offset.reset largest

#如果其超时,将会可能触发rebalance并认为已经死去
zookeeper.session.timeout.ms 6000

#确认zookeeper连接建立操作客户端能等待的最长时间
zookeeper.connection.timeout.ms 6000

参考连接:

https://blog.csdn.net/huanggang028/article/details/47830529

一、简介

1、循环缓冲区的实现原理

环形缓冲区通常有一个读指针和一个写指针。读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区。通过移动读指针和写指针就可以实现缓冲区的数据读取和写入。在通常情况下,环形缓冲区的读用户仅仅会影响读指针,而写用户仅仅会影响写指针。如果仅仅有一个读用户和一个写用户,那么不需要添加互斥保护机制就可以保证数据的正确性。如果有多个读写用户访问环形缓冲区,那么必须添加互斥保护机制来确保多个用户互斥访问环形缓冲区。

2、概念

关于循环缓冲区(Ring Buffer)的概念,其实来自于Linux内核(Maybe),是为解决某些特殊情况下的竞争问题提供了一种免锁的方法。这种特殊的情况就是当生产者和消费者都只有一个,而在其它情况下使用它也是必须要加锁的。对应在Linux内核中有对它的定义:

struct kfifo {

​ unsigned char *buffer;

​ unsigned int size;

​ unsigned int in;

​ unsigned int out;

​ spinlock_t *lock;

};

其中buffer指向存放数据的缓冲区,size是缓冲区的大小,in是写指针下标,out是读指针下标,lock是加到struct kfifo上的自旋锁(上面说不加锁不是这里的锁),防止多个进程并发访问此数据结构。当in==out时,说明缓冲区为空;当(in-out)==size时,说明缓冲区已满。

img

注:我们保有对应的读写指针,当第一批数据(蓝色)完成,第二批数据(红色)会根据当前的写指针位置继续我们的数据操作,当达到最大的Buffer_Size时,会重新回到Buffer的开始端。

我们更多要说的是Ring Buffer关于在我们在日志处理方面的一个应用,我们知道对于Program来说日志记录提供了故障前应用程序状态的详细信息,在一段时间的运行过程中,会将不断地产生大量的跟踪数据,以及调试信息并持续地将其写入到磁盘上的文本文件中。多亿进行有效的日志记录,需要使用大量的磁盘空间,并且在多线程环境中,所需的磁盘空间会成倍地增加。常规的日志处理来说存在一些问题,比如硬盘空间的可用性,以及在对一个文件写入数据时磁盘 I/O 的速度较慢。持续地对磁盘进行写入操作可能会极大地降低程序的性能,导致其运行速度缓慢。通常,可以通过使用日志轮换策略来解决空间问题,将日志保存在几个文件中,当这些文件大小达到某个预定义的字节数时,对它们进行截断和覆盖。

所以要克服空间问题并实现磁盘 I/O 的最小化,某些程序可以将它们的跟踪数据记录在内存中,仅当请求时才转储这些数据。这个循环的、内存中的缓冲区称为循环缓冲区。它可以将相关的数据保存在内存中,而不是每次都将其写入到磁盘上的文件中。在需要的时候(比如当用户请求将内存数据转储到文件中时、程序检测到一个错误时,或者由于非法的操作或者接收到的信号而引起程序崩溃时)可以将内存中的数据转储到磁盘。循环缓冲区日志记录由一个固定大小的内存缓冲区构成,进程使用这个内存缓冲区进行日志记录。

当然现在我们面对的大多是多线程的协同工作,对于日志记录来说,倘若采取传统的加锁机制访问我们的存储文件,这些线程将在获得和释放锁上花费了大部分的时间,所以采取循环缓冲区会是一个不错的办法。通过使得每个线程将数据写入到它自己的内存块,就可以完全避免同步问题。当收到来自用户的转储数据的请求时,每个线程获得一个锁,并将其转储到中心位置。或者分配一个很大的全局内存块,并将其划分为较小的槽位,其中每个槽位都可由一个线程用来进行日志记录。每个线程只能够读写它自己的槽位,而不是整个缓冲区。当每个线程第一次尝试写入数据时,它会尝试寻找一个空的内存槽位,并将其标记为忙碌。当线程获得了一个特定的槽位时,可以将跟踪槽位使用情况的位图中相应的位设置为1,当该线程退出时,重新将这个位设置为 0。在这里需要同时需要维护当前使用的槽位编号的全局列表,以及正在使用它的线程的线程信息。

但是这里需要注意的是当一个线程已经死亡,却没有释放相应的槽位,并在垃圾收集器释放该槽位之前,再次使用了这个线程 ID 并为其分配一个新的槽位。对于新的线程来说,检查全局列表并且重用相同的槽位(如果以前的实例使用了它的话),这是非常重要的。因为垃圾收集器线程和写入者线程可能同时尝试修改全局列表,所以同样也需要使用某种锁定机制。

二、Ring Buffer的优点

我们使用 Ring Buffer 这种数据结构,是因为它给我们提供了可靠的消息传递特性。这个理由就足够了,不过它还有一些其他的优点。

首先,Ring Buffer 比链表要快,因为它是数组,而且有一个容易预测的访问模式。这很不错,对 CPU 高速缓存友好 (CPU-cache-friendly)-数据可以在硬件层面预加载到高速缓存,因此 CPU 不需要经常回到主内存 RAM 里去寻找 Ring Buffer 的下一条数据。

第二点,Ring Buffer 是一个数组,你可以预先分配内存,并保持数组元素永远有效。这意味着内存垃圾收集(GC)在这种情况下几乎什么也不用做。此外,也不像链表那样每增加一条数据都要创建对象-当这些数据从链表里删除时,这些对象都要被清理掉。

参考网址:

https://blog.csdn.net/sim_szm/article/details/17011545

图片说明:

环形缓冲区的应用ringbuffer - CSDN博客

作者:萧修
链接:https://www.jianshu.com/p/8499d9603544
来源:简书

1. synchronized简介

在学习知识前,我们先来看一个现象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SynchronizedDemo implements Runnable {
private static int count = 0;

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new SynchronizedDemo());
thread.start();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("result: " + count);
}

@Override
public void run() {
for (int i = 0; i < 1000000; i++)
count++;
}
}

开启了10个线程,每个线程都累加了1000000次,如果结果正确的话自然而然总数就应该是10 * 1000000 = 10000000。可就运行多次结果都不是这个数,而且每次运行结果都不一样。这是为什么了?有什么解决方案了?这就是我们今天要聊的事情。

在上一篇博文中我们已经了解了java内存模型的一些知识,并且已经知道出现线程安全的主要来源于JMM的设计,主要集中在主内存和线程的工作内存而导致的内存可见性问题,以及重排序导致的问题,进一步知道了happens-before规则。线程运行时拥有自己的栈空间,会在自己的栈空间运行,如果多线程间没有共享的数据也就是说多线程间并没有协作完成一件事情,那么,多线程就不能发挥优势,不能带来巨大的价值。那么共享数据的线程安全问题怎样处理?很自然而然的想法就是每一个线程依次去读写这个共享变量,这样就不会有任何数据安全的问题,因为每个线程所操作的都是当前最新的版本数据。那么,在java关键字synchronized就具有使每个线程依次排队操作共享变量的功能。很显然,这种同步机制效率很低,但synchronized是其他并发容器实现的基础,对它的理解也会大大提升对并发编程的感觉,从功利的角度来说,这也是面试高频的考点。好了,下面,就来具体说说这个关键字。

2. synchronized实现原理

在java代码中使用synchronized可是使用在代码块和方法中,根据Synchronized用的位置可以有这些使用场景:

img

Synchronized的使用场景

如图,synchronized可以用在方法上也可以使用在代码块中,其中方法是实例方法和静态方法分别锁的是该类的实例对象和该类的对象。而使用在代码块中也可以分为三种,具体的可以看上面的表格。这里的需要注意的是:如果锁的是类对象的话,尽管new多个实例对象,但他们仍然是属于同一个类依然会被锁住,即线程之间保证同步关系

现在我们已经知道了怎样synchronized了,看起来很简单,拥有了这个关键字就真的可以在并发编程中得心应手了吗?爱学的你,就真的不想知道synchronized底层是怎样实现了吗?

2.1 对象锁(monitor)机制

现在我们来看看synchronized的具体底层实现。先写一个简单的demo:

1
2
3
4
5
6
7
8
9
10
public class SynchronizedDemo {
public static void main(String[] args) {
synchronized (SynchronizedDemo.class) {
}
method();
}

private static void method() {
}
}

上面的代码中有一个同步代码块,锁住的是类对象,并且还有一个同步静态方法,锁住的依然是该类的类对象。编译之后,切换到SynchronizedDemo.class的同级目录之后,然后用javap -v SynchronizedDemo.class查看字节码文件:

img

SynchronizedDemo.class

如图,上面用黄色高亮的部分就是需要注意的部分了,这也是添Synchronized关键字之后独有的。执行同步代码块后首先要先执行monitorenter指令,退出的时候monitorexit指令。通过分析之后可以看出,使用Synchronized进行同步,其关键就是必须要对对象的监视器monitor进行获取,当线程获取monitor后才能继续往下执行,否则就只能等待。而这个获取的过程是互斥的,即同一时刻只有一个线程能够获取到monitor。上面的demo中在执行完同步代码块之后紧接着再会去执行一个静态同步方法,而这个方法锁的对象依然就这个类对象,那么这个正在执行的线程还需要获取该锁吗?答案是不必的,从上图中就可以看出来,执行静态同步方法的时候就只有一条monitorexit指令,并没有monitorenter获取锁的指令。这就是锁的重入性,即在同一锁程中,线程不需要再次获取同一把锁。Synchronized先天具有重入性。每个对象拥有一个计数器,当线程获取该对象锁后,计数器就会加一,释放锁后就会将计数器减一

任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取该对象的监视器才能进入同步块和同步方法,如果没有获取到监视器的线程将会被阻塞在同步块和同步方法的入口处,进入到BLOCKED状态(关于线程的状态可以看这篇文章

下图表现了对象,对象监视器,同步队列以及执行线程状态之间的关系:

img

对象,对象监视器,同步队列和线程状态的关系

该图可以看出,任意线程对Object的访问,首先要获得Object的监视器,如果获取失败,该线程就进入同步状态,线程状态变为BLOCKED,当Object的监视器占有者释放后,在同步队列中得线程就会有机会重新获取该监视器。

2.2 synchronized的happens-before关系

在上一篇文章中讨论过happens-before规则,抱着学以致用的原则我们现在来看一看Synchronized的happens-before规则,即监视器锁规则:对同一个监视器的解锁,happens-before于对该监视器的加锁。继续来看代码:

1
2
3
4
5
6
7
8
9
10
11
public class MonitorDemo {
private int a = 0;

public synchronized void writer() { // 1
a++; // 2
} // 3

public synchronized void reader() { // 4
int i = a; // 5
} // 6
}

该代码的happens-before关系如图所示:

img

Synchronized的happens-before关系

在图中每一个箭头连接的两个节点就代表之间的happens-before关系,黑色的是通过程序顺序规则推导出来,红色的为监视器锁规则推导而出:线程A释放锁happens-before线程B加锁,蓝色的则是通过程序顺序规则和监视器锁规则推测出来happens-befor关系,通过传递性规则进一步推导的happens-before关系。现在我们来重点关注2 happens-before 5,通过这个关系我们可以得出什么?

根据happens-before的定义中的一条:如果A happens-before B,则A的执行结果对B可见,并且A的执行顺序先于B。线程A先对共享变量A进行加一,由2 happens-before 5关系可知线程A的执行结果对线程B可见即线程B所读取到的a的值为1。

2.3 锁获取和锁释放的内存语义

在上一篇文章提到过JMM核心为两个部分:happens-before规则以及内存抽象模型。我们分析完Synchronized的happens-before关系后,还是不太完整的,我们接下来看看基于java内存抽象模型的Synchronized的内存语义。

废话不多说依旧先上图。

img

线程A写共享变量

从上图可以看出,线程A会首先先从主内存中读取共享变量a=0的值然后将该变量拷贝到自己的本地内存,进行加一操作后,再将该值刷新到主内存,整个过程即为线程A 加锁–>执行临界区代码–>释放锁相对应的内存语义。

img

线程B读共享变量

线程B获取锁的时候同样会从主内存中共享变量a的值,这个时候就是最新的值1,然后将该值拷贝到线程B的工作内存中去,释放锁的时候同样会重写到主内存中。

从整体上来看,线程A的执行结果(a=1)对线程B是可见的,实现原理为:释放锁的时候会将值刷新到主内存中,其他线程获取锁时会强制从主内存中获取最新的值。另外也验证了2 happens-before 5,2的执行结果对5是可见的。

从横向来看,这就像线程A通过主内存中的共享变量和线程B进行通信,A 告诉 B 我们俩的共享数据现在为1啦,这种线程间的通信机制正好吻合java的内存模型正好是共享内存的并发模型结构。

3. synchronized优化

通过上面的讨论现在我们对Synchronized应该有所印象了,它最大的特征就是在同一时刻只有一个线程能够获得对象的监视器(monitor),从而进入到同步代码块或者同步方法之中,即表现为互斥性(排它性)。这种方式肯定效率低下,每次只能通过一个线程,既然每次只能通过一个,这种形式不能改变的话,那么我们能不能让每次通过的速度变快一点了。打个比方,去收银台付款,之前的方式是,大家都去排队,然后去纸币付款收银员找零,有的时候付款的时候在包里拿出钱包再去拿出钱,这个过程是比较耗时的,然后,支付宝解放了大家去钱包找钱的过程,现在只需要扫描下就可以完成付款了,也省去了收银员跟你找零的时间的了。同样是需要排队,但整个付款的时间大大缩短,是不是整体的效率变高速率变快了?这种优化方式同样可以引申到锁优化上,缩短获取锁的时间,伟大的科学家们也是这样做的,令人钦佩,毕竟java是这么优秀的语言(微笑脸)。

在聊到锁的优化也就是锁的几种状态前,有两个知识点需要先关注:(1)CAS操作 (2)Java对象头,这是理解下面知识的前提条件。

3.1 CAS操作

3.1.1 什么是CAS?

使用锁时,线程获取锁是一种悲观锁策略,即假设每一次执行临界区代码都会产生冲突,所以当前线程获取到锁的时候同时也会阻塞其他线程获取该锁。而CAS操作(又称为无锁操作)是一种乐观锁策略,它假设所有线程访问共享资源的时候不会出现冲突,既然不会出现冲突自然而然就不会阻塞其他线程的操作。因此,线程就不会出现阻塞停顿的状态。那么,如果出现冲突了怎么办?无锁操作是使用**CAS(compare and swap)**又叫做比较交换来鉴别线程是否出现冲突,出现冲突就重试当前操作直到没有冲突为止。

3.1.2 CAS的操作过程

CAS比较交换的过程可以通俗的理解为CAS(V,O,N),包含三个值分别为:V 内存地址存放的实际值;O 预期的值(旧值);N 更新的新值。当V和O相同时,也就是说旧值和内存中实际的值相同表明该值没有被其他线程更改过,即该旧值O就是目前来说最新的值了,自然而然可以将新值N赋值给V。反之,V和O不相同,表明该值已经被其他线程改过了则该旧值O不是最新版本的值了,所以不能将新值N赋给V,返回V即可。当多个线程使用CAS操作一个变量是,只有一个线程会成功,并成功更新,其余会失败。失败的线程会重新尝试,当然也可以选择挂起线程

CAS的实现需要硬件指令集的支撑,在JDK1.5后虚拟机才可以使用处理器提供的CMPXCHG指令实现。

Synchronized VS CAS

元老级的Synchronized(未优化前)最主要的问题是:在存在线程竞争的情况下会出现线程阻塞和唤醒锁带来的性能问题,因为这是一种互斥同步(阻塞同步)。而CAS并不是武断的间线程挂起,当CAS操作失败后会进行一定的尝试,而非进行耗时的挂起唤醒的操作,因此也叫做非阻塞同步。这是两者主要的区别。

3.1.3 CAS的应用场景

在J.U.C包中利用CAS实现类有很多,可以说是支撑起整个concurrency包的实现,在Lock实现中会有CAS改变state变量,在atomic包中的实现类也几乎都是用CAS实现,关于这些具体的实现场景在之后会详细聊聊,现在有个印象就好了(微笑脸)。

3.1.4 CAS的问题

1. ABA问题
因为CAS会检查旧值有没有变化,这里存在这样一个有意思的问题。比如一个旧值A变为了成B,然后再变成A,刚好在做CAS时检查发现旧值并没有变化依然为A,但是实际上的确发生了变化。解决方案可以沿袭数据库中常用的乐观锁方式,添加一个版本号可以解决。原来的变化路径A->B->A就变成了1A->2B->3C。java这么优秀的语言,当然在java 1.5后的atomic包中提供了AtomicStampedReference来解决ABA问题,解决思路就是这样的。

2. 自旋时间过长

使用CAS时非阻塞同步,也就是说不会将线程挂起,会自旋(无非就是一个死循环)进行下一次尝试,如果这里自旋时间过长对性能是很大的消耗。如果JVM能支持处理器提供的pause指令,那么在效率上会有一定的提升。

3. 只能保证一个共享变量的原子操作

当对一个共享变量执行操作时CAS能保证其原子性,如果对多个共享变量进行操作,CAS就不能保证其原子性。有一个解决方案是利用对象整合多个共享变量,即一个类中的成员变量就是这几个共享变量。然后将这个对象做CAS操作就可以保证其原子性。atomic中提供了AtomicReference来保证引用对象之间的原子性。

3.2 Java对象头

在同步的时候是获取对象的monitor,即获取到对象的锁。那么对象的锁怎么理解?无非就是类似对对象的一个标志,那么这个标志就是存放在Java对象的对象头。Java对象头里的Mark Word里默认的存放的对象的Hashcode,分代年龄和锁标记位。32为JVM Mark Word默认存储结构为(注:java对象头以及下面的锁状态变化摘自《java并发编程的艺术》一书,该书我认为写的足够好,就没在自己组织语言班门弄斧了):

img

Mark Word存储结构

如图在Mark Word会默认存放hasdcode,年龄值以及锁标志位等信息。

Java SE 1.6中,锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。对象的MarkWord变化为下图:

img

Mark Word状态变化

3.2 偏向锁

HotSpot的作者经过研究发现,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。

偏向锁的获取

当一个线程访问同步块并获取锁时,会在对象头栈帧中的锁记录里存储锁偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单地测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁):如果没有设置,则使用CAS竞争锁;如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程

偏向锁的撤销

偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。

img

偏向锁撤销流程

如图,偏向锁的撤销,需要等待全局安全点(在这个时间点上没有正在执行的字节码)。它会首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,如果线程不处于活动状态,则将对象头设置成无锁状态;如果线程仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的Mark Word要么重新偏向于其他线程,要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程。

下图线程1展示了偏向锁获取的过程,线程2展示了偏向锁撤销的过程。

img

偏向锁获取和撤销流程

如何关闭偏向锁

偏向锁在Java 6和Java 7里是默认启用的,但是它在应用程序启动几秒钟之后才激活,如有必要可以使用JVM参数来关闭延迟:**-XX:BiasedLockingStartupDelay=0。如果你确定应用程序里所有的锁通常情况下处于竞争状态,可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false**,那么程序默认会进入轻量级锁状态

3.3 轻量级锁

加锁

线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁

解锁

轻量级解锁时,会使用原子的CAS操作将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。下图是两个线程同时争夺锁,导致锁膨胀的流程图。

img

轻量级锁加锁解锁以及锁膨胀

因为自旋会消耗CPU,为了避免无用的自旋(比如获得锁的线程被阻塞住了),一旦锁升级成重量级锁,就不会再恢复到轻量级锁状态。当锁处于这个状态下,其他线程试图获取锁时,都会被阻塞住,当持有锁的线程释放锁之后会唤醒这些线程,被唤醒的线程就会进行新一轮的夺锁之争。

3.5 各种锁的比较

img

各种锁的对比

4. 一个例子

经过上面的理解,我们现在应该知道了该怎样解决了。更正后的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SynchronizedDemo implements Runnable {
private static int count = 0;

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new SynchronizedDemo());
thread.start();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("result: " + count);
}

@Override
public void run() {
synchronized (SynchronizedDemo.class) {
for (int i = 0; i < 1000000; i++)
count++;
}
}
}

开启十个线程,每个线程在原值上累加1000000次,最终正确的结果为10X1000000=10000000,这里能够计算出正确的结果是因为在做累加操作时使用了同步代码块,这样就能保证每个线程所获得共享变量的值都是当前最新的值,如果不使用同步的话,就可能会出现A线程累加后,而B线程做累加操作有可能是使用原来的就值,即“脏值”。这样,就导致最终的计算结果不是正确的。而使用Syncnized就可能保证内存可见性,保证每个线程都是操作的最新值。这里只是一个示例性的demo,聪明的你,还有其他办法吗?

参考文献

《java并发编程的艺术》

作者:你听___
链接:https://www.jianshu.com/p/d53bf830fa09
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

背景
项目最近做了大的改动,添加了大量配置,Spring Boot项目启动时间从原来的100秒变成了400秒以上。从大量无效日志中一时难以看出瓶颈所在。首先需要一款性能分析软件,发现Intel® VTune™ Profiler对项目性能影响甚小。

性能分析
VTune是 Intel 提供的一个强大的应用程序性能分析软件,如何使用只需要看这两篇官方教程。(下载需要注册intel账号)
《在windows上面启动》https://software.intel.com/en-us/get-started-with-vtune-windows-os
《分析Java程序》https://software.intel.com/zh-cn/vtune-help-java-code-analysis
为了能够在VTune分析Java项目,首先创建一个启动脚本启动项目,然后在VTune创建一个性能分析项目开启性能监控。
start.bat脚本如下,很简单。

java -jar myboot.jar
1

项目启动后结束性能监控,VTune会自动分析监控结果,在Bottom-up Tab页可以看到哪些方法花费了大量CPU时间。
其中Spring的一个matches方法就占用了50秒。

org.springframework.boot.bind.DefaultPropertyNamePatternsMatcher.matches
1

性能瓶颈
查阅DefaultPropertyNamePatternsMatcher.matches源码及资料,matches的作用是根据我们在ConfigurationProperties中设置的prefix的值获取对应的匹配项。
由于分库分表后有多个主库与从库,Properties配置了大量不同的数据源。解析这些数据源配置耗费了大量时间,再加上解析后初始化数据源对象。

spring.datasource.url=jdbc:mysql://127.0.0.1:3310/test
spring.datasource.username=root
spring.datasource.password=123456
1
2
3
@Bean(name = “masterDataSource”)
@ConfigurationProperties(prefix = “spring.datasource”)
public DataSource masterDataSource() {
return DataSourceBuilder.create().type(dataSourceType).build();
}
1
2
3
4
5
根据Spring官方github,matches在匹配prefix值的过程中确实性能欠佳,尤其是prefix越短所花费的匹配时间越长。
详见:spring boot startup performance issue when large properties file

改善
matches性能问题在Spring Boot 2中已经得到改善,但项目升Spring Boot 2遥遥无期。
既然瓶颈在过多的prefix,首先可以减少prefix配置但治标不治本。最终解决方法为本地调试绕过Spring matches方法,改用测试插件去初始化数据源,将项目启动时间从400秒压缩为100秒左右。
————————————————
版权声明:本文为CSDN博主「tom3mao」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/j16421881/article/details/105444700

在 application.yml 中配置:

1
spring:  profiles:    active: @spring.profile@

其中, spring.profiles.active 配置项表示启用哪套环境,这里没有明确指定,而是用了占位符 @spring.profile@

不同的环境有不同的配置文件,例如以下有 dev、prod、test 三套环境:

在打包时,如果想要打 prod 环境的包,可以使用命令:

mvn clean package -Dmaven.test.skip=true -Dspring.profile=prod

打包后解压,再次查看 application.yml,占位符 @spring.profile@ 被替换成了 prod,即打包命令通过 -Dspring.profile=prod 传入的值:

1
spring:  profiles:    active: prod

原理

Spring Boot 项目通过 @..@ 指定占位符,参考文档:https://docs.spring.io/spring-boot/docs/2.1.4.RELEASE/reference/htmlsingle/#howto-automatic-expansion-maven

我们的 Spring Boot 项目都是继承自 spring-boot-starter-parent 模块,源码中可以找到:

配置占位符为 @:

https://github.com/spring-projects/spring-boot/blob/484050347a919e15c16885b14472e557461cccc9/spring-boot-project/spring-boot-starters/spring-boot-starter-parent/pom.xml#L19

并且使用了 maven-resources-plugin 插件

https://github.com/spring-projects/spring-boot/blob/484050347a919e15c16885b14472e557461cccc9/spring-boot-project/spring-boot-starters/spring-boot-starter-parent/pom.xml#L132

查看 maven-resources-plugin 插件的文档,可以看到这个插件可以在打包时通过 “-D” 选项,对项目中的占位符进行替换:

https://maven.apache.org/plugins/maven-resources-plugin/examples/filter.html

一、Slf4j

slf4j(Simple Logging Facade for Java)是日志框架的一种抽象,那么也就是说 slf4j 是不能单独使用的必须要有其他实现日志框架来配合使用,并且如果要启用slf4j框架要导入slf4j-api-xxx.jar 这个包, 这个包是slf4j 实现各种支持的日志框架的包。如 log4j、log4j2、logback等。

编码模式

1
2
3
4
5
6
7
8
9
10
11
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
...
...
private static final Logger logger = LoggerFactory.getLogger(T.class);

logger.trace();
logger.debug();
logger.info();
logger.warn();
logger.error();

注解模式

IDEA需要安装Lombok插件

pom文件引入lombok依赖

1
2
3
4
5
6
7
8
9
10
@slf4j
public class OauthApp {
...
...
logger.trace();
logger.debug();
logger.info();
logger.warn();
logger.error();
}

二、Logback

logback-spring.xml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
?
<springProperty scope="context" name="springAppName" source="spring.application.name"/>
<springProperty scope="context" name="ELK_FILEBEAT_PATH" source="elk.filebeat_path" defaultValue="/PATHTO/log"/>
<springProperty scope="context" name="ELK_URL" source="elk.url" defaultValue="127.0.0.1"/>?
<springProperty scope="context" name="ELK_QUEUE_SIZE" source="elk.queue_size" defaultValue="8192"/>?
<!-- Example for logging into the build folder of your project -->
<property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}"/>?

<!-- You can override this to have a custom pattern -->
<property name="CONSOLE_LOG_PATTERN"
value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(%X{transNo}){faint} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

<!-- Appender to log to console -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<!-- Minimum logging level to be presented in the console logs-->
<level>DEBUG</level>
</filter>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<destination>${ELK_URL}</destination>
<queueSize>${ELK_QUEUE_SIZE}</queueSize>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"logdate":"%date{ISO8601}",
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"parent": "%X{X-B3-ParentSpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message",
"transNo": "%X{transNo}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>

<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- daily rollover -->
<!--<fileNamePattern>C:/aaa/log/uw.%d{yyyy-MM-dd}.log</fileNamePattern>-->
<fileNamePattern>${ELK_FILEBEAT_PATH}/fin-info.%d{yyyy-MM-dd}-%i.log</fileNamePattern>
<maxFileSize>128MB</maxFileSize>
<!-- keep 30 days' worth of history capped at 3GB total size -->
<maxHistory>30</maxHistory>
<totalSizeCap>2GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<!--<timestamp>-->
<!--<timeZone>UTC</timeZone>-->
<!--</timestamp>-->
<pattern>
<pattern>
{
"logdate":"%date{ISO8601}",
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"parent": "%X{X-B3-ParentSpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message",
"transNo": "%X{transNo}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<appender name="FILE_DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- daily rollover -->
<!--<fileNamePattern>C:/aaa/log/uw.%d{yyyy-MM-dd}.log</fileNamePattern>-->
<fileNamePattern>${ELK_FILEBEAT_PATH}/fin-debug.%d{yyyy-MM-dd}-%i.log</fileNamePattern>
<maxFileSize>128MB</maxFileSize>
<!-- keep 30 days' worth of history capped at 3GB total size -->
<maxHistory>30</maxHistory>
<totalSizeCap>2GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<!--<timestamp>-->
<!--<timeZone>UTC</timeZone>-->
<!--</timestamp>-->
<pattern>
<pattern>
{
"logdate":"%date{ISO8601}",
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"parent": "%X{X-B3-ParentSpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message",
"transNo": "%X{transNo}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<!--<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">-->
<!--<appender-ref ref="FILE" />-->
<!--</appender>-->
?
<root level="info">
<appender-ref ref="console"/>
<!-- uncomment this to have also JSON logs -->
<appender-ref ref="logstash"/>
<!-- <appender-ref ref="FILE" />-->
<!--<appender-ref ref="ASYNC"/>-->
</root>
<logger name="com.sinosoft" level="INFO" additivity="false">
<appender-ref ref="console"/>
<!-- uncomment this to have also JSON logs -->
<appender-ref ref="logstash"/>
<appender-ref ref="FILE_INFO" />
</logger>
<logger name="com.sinosoft" level="DEBUG" additivity="false">
<appender-ref ref="console"/>
<!-- uncomment this to have also JSON logs -->
<appender-ref ref="logstash"/>
<appender-ref ref="FILE_DEBUG" />
</logger>
</configuration>

属性配置:

  • elk.filebeat_path
  • elk.url
  • elk.queue_size

三、Log4j2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
<?xml version="1.0" encoding="UTF-8"?>
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!--status="WARN" :用于设置log4j2自身内部日志的信息输出级别,默认是OFF-->
<!--monitorInterval="30" :间隔秒数,自动检测配置文件的变更和重新配置本身-->
<configuration status="warn" monitorInterval="60" strict="true">
<properties>
<!--自定义一些常量,之后使用${变量名}引用-->
<property name="logpath">./logs</property>
<property name="charset">UTF-8</property>
<!--自定义的输出格式-->
<property name="pattern">%-d{yyyy-MM-dd HH:mm:ss.SSS}@@%p@@%X{ip}@@%t %C@@%X{requestId} %M %m %n </property>
</properties>
<!--appenders:定义输出内容,输出格式,输出方式,日志保存策略等,常用其下三种标签[console,File,RollingFile]-->
<!--Appender可以理解为日志的输出目的地-->
<appenders>
<!--console :控制台输出的配置-->
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
</Console>
<!--RollingRandomAccessFile性能比RollingFile提升官网宣称是20-200%-->
<RollingRandomAccessFile name="YZY.TRACE" immediateFlush="true" bufferSize="1024"
fileName="${logpath}/trace.log"
filePattern="${logpath}/trace.log.%d{yyyy-MM-dd}.gz">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<TimeBasedTriggeringPolicy/>
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="trace.log.*.gz"/>
<IfLastModified age="3d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="YZY.SYSTEM" immediateFlush="true" bufferSize="4096"
fileName="${logpath}/system.log"
filePattern="${logpath}/system.log.%d{yyyy-MM-dd}.gz"
ignoreExceptions="false">
<!--引用上面自定义的输出格式-->
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<Filters>
<!--ThresholdFilter :日志输出过滤-->
<!--level="info" :日志级别,onMatch="ACCEPT" :级别在info之上则接受,onMismatch="DENY" :级别在info之下则拒绝-->
<!--与logger、root中定义的日志级别相配合,相当于两个闸门,先判断logger、root的级别,符合了才会用到该filter中的level,此时再进行一次筛选-->
<ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/>
<!--<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>-->
<!--<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>-->
</Filters>
<!-- Policies :日志滚动策略-->
<Policies>
<!--<TimeBasedTriggeringPolicy interval="1" modulate="true"/>-->
<CronTriggeringPolicy schedule="0 0 2 * * ?" evaluateOnStartup="true"/>
</Policies>
<!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件-->
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="system.log.*.gz"/>
<!--只保留7天,超过则删除-->
<IfLastModified age="7d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="YZY.ERROR" immediateFlush="true" bufferSize="4096"
fileName="${logpath}/error.log"
filePattern="${logpath}/error.log.%d{yyyy-MM-dd}.gz"
ignoreExceptions="false">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<Filters>
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<TimeBasedTriggeringPolicy/>
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="error.log.*.gz"/>
<IfLastModified age="7d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="YZY.AUDIT" immediateFlush="false" bufferSize="8192"
fileName="${logpath}/audit.log"
filePattern="${logpath}/audit.log.%d{yyyy-MM-dd}.gz"
ignoreExceptions="false">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<TimeBasedTriggeringPolicy/>
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="audit.log.*.gz"/>
<IfLastModified age="7d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="YZY.POOL" immediateFlush="true" bufferSize="1024"
fileName="${logpath}/pool.log"
filePattern="${logpath}/pool.log.%d{yyyy-MM-dd}.gz"
ignoreExceptions="false">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<TimeBasedTriggeringPolicy/>
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="pool.log.*.gz"/>
<IfLastModified age="3d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="YZY.MONITOR" immediateFlush="true" bufferSize="1024"
fileName="${logpath}/monitor.log"
filePattern="${logpath}/pool.log.%d{yyyy-MM-dd}.gz"
ignoreExceptions="false">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<TimeBasedTriggeringPolicy/>
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="pool.log.*.gz"/>
<IfLastModified age="3d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="YZY.BIZ" immediateFlush="true"
fileName="${logpath}/biz.log"
filePattern="${logpath}/biz.log.%d{yyyy-MM-dd}.gz"
ignoreExceptions="false">
<PatternLayout pattern="${pattern}" charset="${charset}"/>
<TimeBasedTriggeringPolicy/>
<DefaultRolloverStrategy>
<Delete basePath="${logpath}" maxDepth="2" followLinks="true">
<IfFileName glob="biz.log.*.gz"/>
<IfLastModified age="7d"/>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
</appenders>

<!--然后定义logger,只有定义了logger并引入的appender,appender才会生效-->
<loggers>
<!--additivity="false"表示在该logger中输出的日志不会再延伸到父层logger。这里如果改为true,则会延伸到Root Logger,遵循Root Logger的配置也输出一次。-->
<Logger additivity="false" name="YZY.TRACE" level="INFO">
<AppenderRef ref="YZY.TRACE"/>
</Logger>
<Logger additivity="false" name="YZY.SYSTEM" level="INFO">
<AppenderRef ref="YZY.SYSTEM"/>
<AppenderRef ref="YZY.ERROR"/>
</Logger>
<Logger additivity="false" name="YZY.BIZ" level="INFO">
<AppenderRef ref="YZY.BIZ"/>
</Logger>
<!--Logger节点用来单独指定日志的形式,name为包路径,比如要为org.apache包下所有日志指定为INFO级别等。 -->
<Logger additivity="false" name="org.apache" level="INFO">
<AppenderRef ref="console"/>
</Logger>
<Logger additivity="false"
name="com.alibaba.dubbo.common.threadpool.monitor.MonitorPoolRunnable" level="INFO">
<AppenderRef ref="YZY.POOL"/>
</Logger>
<Logger additivity="false" name="com.alibaba.dubbo.monitor.dubbo.sfextend.SfMonitorExtend"
level="INFO">
<AppenderRef ref="YZY.MONITOR"/>
</Logger>
<!--针对,request以及reponse的信息配置输出级别,生产线请配置为error-->
<Logger additivity="true" name="com.alibaba.dubbo.rpc.protocol.rest.support" level="INFO">
<AppenderRef ref="console"/>
</Logger>
<!-- 在开发和测试环境启用,输出sql -->
<Logger additivity="true" name="com.YZY.mapper" level="DEBUG">
</Logger>
<!-- Root节点用来指定项目的根日志,如果没有单独指定Logger,那么就会默认使用该Root日志输出 -->
<Root level="DEBUG" includeLocation="true">
<AppenderRef ref="console"/>
<AppenderRef ref="YZY.SYSTEM"/>
<AppenderRef ref="YZY.ERROR"/>
<AppenderRef ref="YZY.AUDIT"/>
</Root>
</loggers>
</configuration>

四、性能对比结果

在这里插入图片描述