0%

关于rocketMq消息消费,一般使用的是并发消费模式。

消息消费时要考虑的问题主要有以下几个

  1. 消息处理耗时 如果耗时过长,会导致消息积压。
  2. 幂等性,一般rocketMq消息不会重复,但是还是存在这种可能性。 所以需要对重复消息进行幂等处理
    1. 幂等的处理逻辑: 需要保证重复消息只处理一次,或者处理效果等效。 一般的处理方案是丢弃重复消息。
    2. 消息消费是记录消息的唯一id,每次消费消息时检查以下是否已经消费过,是的话直接标记消息已消费。
  3. 并发问题的控制。
    1. 一般方案:通过加锁,消息处理中唯一键要保证唯一性。
    2. 进行线程安全控制。
    3. 通过 insert on duplicate key update 方式: 此方式有个缺陷: db表的自增主键会在每次唯一键冲突时 增加。
    4. 例子: 通过消费支付消息,统计用户月度累计消费金额。
      1. 更新数据,基于用户id和月份,如果更新结果是0,表示记录不存在
      2. 进行数据插入操作。 如果插入失败,则说明由于并发其他线程已经插入数据。直接返回失败,待后续消息重新消费。
      3. 如果插入成功则直接继续后续流程。

最佳实践:

​ 新增&更新记录防并发策略:

  1. 先更新
    1. 成功直接返回
    2. 失败,则走 insert on duplicate key update 方式。(改方式存在db表自增主键不连续问题)

参考资料:

https://www.sevenyuan.cn/2020/11/18/jms/2020-11-18-RocketMQ-Learning-Client/

背景

产品需求:需要采集某个任务的详细数据。任务数据直接写redis。 考虑到不影响接口使用性能,所以没在接口调用时添加逻辑记录用户id。 计划使用 扫描redis的方式获取指定前缀key的所有记录。

关于scan的资料 见文尾

重点关注下 scan的缺点:

同一个元素可能会被返回多次。 处理重复元素的工作交由应用程序负责, 比如说, 可以考虑将迭代返回的元素仅仅用于可以安全地重复执行多次的操作上。
如果一个元素是在迭代过程中被添加到数据集的, 又或者是在迭代过程中从数据集中被删除的, 那么这个元素可能会被返回, 也可能不会, 这是未定义的(undefined)。
SCAN 命令每次执行返回的元素数量
增量式迭代命令并不保证每次执行都返回某个给定数量的元素。

增量式命令甚至可能会返回零个元素, 但只要命令返回的游标不是 0 , 应用程序就不应该将迭代视作结束。

结合我线上踩坑理解:

​ 1.线上数据会重复出现,代码要保证对幂等性的处理

​ 2.数据丢失问题,不能保证一定可以全量遍历所有key

1
2
3
4
5
6
7
8
9
10
11
12
SCAN命令的保证
SCAN 命令, 以及其他增量式迭代命令, 在进行完整遍历的情况下可以为用户带来以下保证:

从完整遍历开始直到完整遍历结束期间, 一直存在于数据集内的所有元素都会被完整遍历返回。
这意味着, 如果有一个元素, 它从遍历开始直到遍历结束期间都存在于被遍历的数据集当中, 那么 SCAN 命令总会在某次迭代中将这个元素返回给用户。
然而因为增量式命令仅仅使用游标来记录迭代状态, 所以这些命令带有以下缺点:

同一个元素可能会被返回多次。 处理重复元素的工作交由应用程序负责, 比如说, 可以考虑将迭代返回的元素仅仅用于可以安全地重复执行多次的操作上。

如果一个元素是在迭代过程中被添加到数据集的, 又或者是在迭代过程中从数据集中被删除的, 那么这个元素可能会被返回, 也可能不会, 这是未定义的(undefined)。


redis的scan命令详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

SCAN 命令及其相关的 SSCAN 命令、 HSCAN 命令和 ZSCAN 命令都用于增量地迭代(incrementally iterate)一集元素(a collection of elements):

SCAN 命令用于迭代当前数据库中的数据库键。
SSCAN 命令用于迭代集合键中的元素。
HSCAN 命令用于迭代哈希键中的键值对。
ZSCAN 命令用于迭代有序集合中的元素(包括元素成员和元素分值)。
以上列出的四个命令都支持增量式迭代, 它们每次执行都只会返回少量元素, 所以这些命令可以用于生产环境, 而不会出现像 KEYS 命令、 SMEMBERS 命令带来的问题 —— 当 KEYS 命令被用于处理一个大的数据库时, 又或者 SMEMBERS 命令被用于处理一个大的集合键时, 它们可能会阻塞服务器达数秒之久。

不过, 增量式迭代命令也不是没有缺点的: 举个例子, 使用 SMEMBERS 命令可以返回集合键当前包含的所有元素, 但是对于 SCAN 这类增量式迭代命令来说, 因为在对键进行增量式迭代的过程中, 键可能会被修改, 所以增量式迭代命令只能对被返回的元素提供有限的保证 (offer limited guarantees about the returned elements)。

因为 SCAN 、 SSCAN 、 HSCAN 和 ZSCAN 四个命令的工作方式都非常相似, 所以这个文档会一并介绍这四个命令, 但是要记住:

SSCAN 命令、 HSCAN 命令和 ZSCAN 命令的第一个参数总是一个数据库键。
而 SCAN 命令则不需要在第一个参数提供任何数据库键 —— 因为它迭代的是当前数据库中的所有数据库键。
SCAN 命令的基本用法
SCAN 命令是一个基于游标的迭代器(cursor based iterator): SCAN 命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数, 以此来延续之前的迭代过程。

当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束。

以下是一个 SCAN 命令的迭代过程示例:

redis 127.0.0.1:6379> scan 0
1) "17"
2) 1) "key:12"
2) "key:8"
3) "key:4"
4) "key:14"
5) "key:16"
6) "key:17"
7) "key:15"
8) "key:10"
9) "key:3"
10) "key:7"
11) "key:1"

redis 127.0.0.1:6379> scan 17
1) "0"
2) 1) "key:5"
2) "key:18"
3) "key:0"
4) "key:2"
5) "key:19"
6) "key:13"
7) "key:6"
8) "key:9"
9) "key:11"
在上面这个例子中, 第一次迭代使用 0 作为游标, 表示开始一次新的迭代。

第二次迭代使用的是第一次迭代时返回的游标, 也即是命令回复第一个元素的值 —— 17 。

从上面的示例可以看到, SCAN 命令的回复是一个包含两个元素的数组, 第一个数组元素是用于进行下一次迭代的新游标, 而第二个数组元素则是一个数组, 这个数组中包含了所有被迭代的元素。

在第二次调用 SCAN 命令时, 命令返回了游标 0 , 这表示迭代已经结束, 整个数据集(collection)已经被完整遍历过了。

以 0 作为游标开始一次新的迭代, 一直调用 SCAN 命令, 直到命令返回游标 0 , 我们称这个过程为一次完整遍历(full iteration)。

SCAN 命令的保证(guarantees)
SCAN 命令, 以及其他增量式迭代命令, 在进行完整遍历的情况下可以为用户带来以下保证: 从完整遍历开始直到完整遍历结束期间, 一直存在于数据集内的所有元素都会被完整遍历返回; 这意味着, 如果有一个元素, 它从遍历开始直到遍历结束期间都存在于被遍历的数据集当中, 那么 SCAN 命令总会在某次迭代中将这个元素返回给用户。

然而因为增量式命令仅仅使用游标来记录迭代状态, 所以这些命令带有以下缺点:

同一个元素可能会被返回多次。 处理重复元素的工作交由应用程序负责, 比如说, 可以考虑将迭代返回的元素仅仅用于可以安全地重复执行多次的操作上。
如果一个元素是在迭代过程中被添加到数据集的, 又或者是在迭代过程中从数据集中被删除的, 那么这个元素可能会被返回, 也可能不会, 这是未定义的(undefined)。
SCAN 命令每次执行返回的元素数量
增量式迭代命令并不保证每次执行都返回某个给定数量的元素。

增量式命令甚至可能会返回零个元素, 但只要命令返回的游标不是 0 , 应用程序就不应该将迭代视作结束。

不过命令返回的元素数量总是符合一定规则的, 在实际中:

对于一个大数据集来说, 增量式迭代命令每次最多可能会返回数十个元素;
而对于一个足够小的数据集来说, 如果这个数据集的底层表示为编码数据结构(encoded data structure,适用于是小集合键、小哈希键和小有序集合键), 那么增量迭代命令将在一次调用中返回数据集中的所有元素。
最后, 用户可以通过增量式迭代命令提供的 COUNT 选项来指定每次迭代返回元素的最大值。

COUNT 选项
虽然增量式迭代命令不保证每次迭代所返回的元素数量, 但我们可以使用 COUNT 选项, 对命令的行为进行一定程度上的调整。

基本上, COUNT 选项的作用就是让用户告知迭代命令, 在每次迭代中应该从数据集里返回多少元素。

虽然 COUNT 选项只是对增量式迭代命令的一种提示(hint), 但是在大多数情况下, 这种提示都是有效的。

COUNT 参数的默认值为 10 。
在迭代一个足够大的、由哈希表实现的数据库、集合键、哈希键或者有序集合键时, 如果用户没有使用 MATCH 选项, 那么命令返回的元素数量通常和 COUNT 选项指定的一样, 或者比 COUNT 选项指定的数量稍多一些。
在迭代一个编码为整数集合(intset,一个只由整数值构成的小集合)、 或者编码为压缩列表(ziplist,由不同值构成的一个小哈希或者一个小有序集合)时, 增量式迭代命令通常会无视 COUNT 选项指定的值, 在第一次迭代就将数据集包含的所有元素都返回给用户。
并非每次迭代都要使用相同的 COUNT 值。
用户可以在每次迭代中按自己的需要随意改变 COUNT 值, 只要记得将上次迭代返回的游标用到下次迭代里面就可以了。

MATCH 选项
和 KEYS 命令一样, 增量式迭代命令也可以通过提供一个 glob 风格的模式参数, 让命令只返回和给定模式相匹配的元素, 这一点可以通过在执行增量式迭代命令时, 通过给定 MATCH <pattern> 参数来实现。

以下是一个使用 MATCH 选项进行迭代的示例:

redis 127.0.0.1:6379> sadd myset 1 2 3 foo foobar feelsgood
(integer) 6

redis 127.0.0.1:6379> sscan myset 0 match f*
1) "0"
2) 1) "foo"
2) "feelsgood"
3) "foobar"
需要注意的是, 对元素的模式匹配工作是在命令从数据集中取出元素之后, 向客户端返回元素之前的这段时间内进行的, 所以如果被迭代的数据集中只有少量元素和模式相匹配, 那么迭代命令或许会在多次执行中都不返回任何元素。

以下是这种情况的一个例子:

redis 127.0.0.1:6379> scan 0 MATCH *11*
1) "288"
2) 1) "key:911"

redis 127.0.0.1:6379> scan 288 MATCH *11*
1) "224"
2) (empty list or set)

redis 127.0.0.1:6379> scan 224 MATCH *11*
1) "80"
2) (empty list or set)

redis 127.0.0.1:6379> scan 80 MATCH *11*
1) "176"
2) (empty list or set)

redis 127.0.0.1:6379> scan 176 MATCH *11* COUNT 1000
1) "0"
2) 1) "key:611"
2) "key:711"
3) "key:118"
4) "key:117"
5) "key:311"
6) "key:112"
7) "key:111"
8) "key:110"
9) "key:113"
10) "key:211"
11) "key:411"
12) "key:115"
13) "key:116"
14) "key:114"
15) "key:119"
16) "key:811"
17) "key:511"
18) "key:11"
如你所见, 以上的大部分迭代都不返回任何元素。

在最后一次迭代, 我们通过将 COUNT 选项的参数设置为 1000 , 强制命令为本次迭代扫描更多元素, 从而使得命令返回的元素也变多了。

并发执行多个迭代
在同一时间, 可以有任意多个客户端对同一数据集进行迭代, 客户端每次执行迭代都需要传入一个游标, 并在迭代执行之后获得一个新的游标, 而这个游标就包含了迭代的所有状态, 因此, 服务器无须为迭代记录任何状态。

中途停止迭代
因为迭代的所有状态都保存在游标里面, 而服务器无须为迭代保存任何状态, 所以客户端可以在中途停止一个迭代, 而无须对服务器进行任何通知。

即使有任意数量的迭代在中途停止, 也不会产生任何问题。

使用错误的游标进行增量式迭代
使用间断的(broken)、负数、超出范围或者其他非正常的游标来执行增量式迭代并不会造成服务器崩溃, 但可能会让命令产生未定义的行为。

未定义行为指的是, 增量式命令对返回值所做的保证可能会不再为真。

只有两种游标是合法的:

在开始一个新的迭代时, 游标必须为 0 。
增量式迭代命令在执行之后返回的, 用于延续(continue)迭代过程的游标。
迭代终结的保证
增量式迭代命令所使用的算法只保证在数据集的大小有界(bounded)的情况下, 迭代才会停止, 换句话说, 如果被迭代数据集的大小不断地增长的话, 增量式迭代命令可能永远也无法完成一次完整迭代。

从直觉上可以看出, 当一个数据集不断地变大时, 想要访问这个数据集中的所有元素就需要做越来越多的工作, 能否结束一个迭代取决于用户执行迭代的速度是否比数据集增长的速度更快。

可用版本:

>= 2.8.0
时间复杂度:

增量式迭代命令每次执行的复杂度为 O(1) , 对数据集进行一次完整迭代的复杂度为 O(N) , 其中 N 为数据集中的元素数量。
返回值:

SCAN 命令、 SSCAN 命令、 HSCAN 命令和 ZSCAN 命令都返回一个包含两个元素的 multi-bulk 回复: 回复的第一个元素是字符串表示的无符号 64 位整数(游标), 回复的第二个元素是另一个 multi-bulk 回复, 这个 multi-bulk 回复包含了本次被迭代的元素。

SCAN 命令返回的每个元素都是一个数据库键。

SSCAN 命令返回的每个元素都是一个集合成员。

HSCAN 命令返回的每个元素都是一个键值对,一个键值对由一个键和一个值组成。

ZSCAN 命令返回的每个元素都是一个有序集合元素,一个有序集合元素由一个成员(member)和一个分值(score)组成。

使用 git archive 可以导出干净的代码,具体可以看 git

help archive,简单使用如下:

1
$git archive --format zip --output "./output.zip" master -0

命令:

gunzip archive.gz

使用:

.gz 是文件在Linux中进行gzip压缩。 要提取的文件名为.gz的文件,我们用gunzip命令。 首先使用下面的命令,创建 gzip access.log文件的(.gz )存档。请记住,下面的命令将删除原始文件。

1
# gzip access.log

上面的命令将在当前目录中产生名为access.log.gz 一个存档文件。

1
2
# ls -l access.log.gz
-rw-r--r-- 1 root root 37 Sep 14 04:02 access.log.gz

现在用gunzip命令解压access.log.gz文件。这将从存档中提取文件,并自动删除.GZ文件。

1
# gunzip access.log.gz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import random
import string

from flask import Flask, jsonify, request

app = Flask(__name__)


@app.route('/api', methods=['GET'])
def get_port():
"""
无参
curl -X GET 'http://192.168.1.110:8001/api'
"""
return jsonify({'port': '8001'})


@app.route('/api/num', methods=['GET'])
def get_num():
"""
无参
curl -X GET 'http://192.168.1.110:8001/api/num'
"""
return {'number': random.randint(100, 5000)}


@app.route('/api/msg', methods=['GET'])
def get_msg():
"""
接收form表单传递的参数
curl --location --request GET 'http://192.168.1.110:8001/api/msg' --form 'name=zsx'
curl -X GET 'http://192.168.1.110:8001/api/msg' --form 'name=zsx'
"""
return {'msg': f"hello, {request.form['name']}"}


@app.route('/api/age', methods=['GET'])
def get_age():
"""
接收'x-www-form-urlencoded'传递的参数
curl --location --request GET 'http://192.168.1.110:8001/api/age' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'id=1'

curl -X GET --data-urlencode "id=1" 'http://192.168.1.110:8001/api/age'
"""
user_id = request.form.get('id')
if user_id:
age = random.randint(18, 100)
else:
age = None
return {'age': age}


@app.route('/api/name', methods=['GET'])
def get_name():
"""
接收url传递的参数
curl --location --request GET 'http://192.168.1.110:8001/api/name?id=1'
curl -X GET 'http://192.168.1.110:8001/api/name?id=1'
"""
try:
# 方式1
# params = parse.parse_qs(parse.urlparse(request.url).query)
# user_id = params['id'][0].strip()

# 方式2
user_id = request.args.get("id").strip()

if user_id == '1':
name = "zsx"
else:
name = ''.join(random.sample(string.ascii_letters + string.digits, 5))
except Exception as e:
print(e)
# raise e
return {'msg': 'error'}
else:
return {'msg': name}


@app.route('/api/data', methods=['GET'])
def get_data():
"""
接收传递的JSON参数
curl --location --request GET 'http://192.168.1.110:8001/api/data' \
--header 'Content-Type: application/json' \
--data-raw '{"id": 1, "name": "lisi"}'

curl --header "Content-Type:application/json" \
-X GET --data '{"id": 1, "name": "lisi"}' \
http://192.168.1.110:8001/api/data
"""
if not request.json:
return jsonify({'msg': 'data is null'})
elif 'id' not in request.json:
return {'msg': 'id is null'}
elif 'name' not in request.json:
return {'msg': 'name is null'}
try:
user_id = request.json['id']
name = request.json['name']
except Exception as e:
print(e)
# raise e
return {'msg': 'error'}
else:
return {'id': user_id, 'name': name, 'age': random.randint(18, 100)}


if __name__ == '__main__':
app.run(host='0.0.0.0', port=8001, debug=True)

img

CentOS是经常使用的Linux系统之一,特别是作为服务器使用,其只自带了Python2,但是现在使用更广泛的是Python3,因此需要自行安装,同时为了更方便地安装第三方库,还需要安装pip3。

一、安装相关依赖

1.安装环境依赖:

1
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel

2.安装gcc编译器(有可能已经安装)

1
yum install gcc -y

二、安装Python3

以Python3.7为例讲解。

1.下载Python安装包

1
wget https://www.python.org/ftp/python/3.7.4/Python-3.7.4.tgz

因为下载很慢,所以可以在本地通过更快的方式下载后再上传到服务器。

2.将安装包移动到/usr/local文件夹下

1
mv Python-3.7.4.tgz /usr/local/

3.在local目录下创建Python3目录

1
mkdir /usr/local/python3

4.进入的Python安装包压缩包所在的目录

1
cd /usr/local/

5.解压安装包

1
tar -xvf Python-3.7.4.tgz

6.进入解压后的目录

1
cd /usr/local/Python-3.7.4/

7.配置安装目录

1
./configure --prefix=/usr/local/python3

8.编译源码

1
make

9.执行源码安装

1
make install

这一步可能会出现报错ModuleNotFoundError: No module named '_ctypes',这是因为缺少依赖包libffi-devel,解决方法可参考https://blog.csdn.net/CUFEECR/article/details/103093951。

10.创建软连接

1
ln -s /usr/local/python3/bin/python3  /usr/bin/python3

11.测试 输入python3打印:

1
2
3
4
Python 3.7.4 (default, Sep  6 2020, 09:22:23) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

即说明Python安装成功。

二、安装pip3

1.安装依赖(非必要)

1
2
sudo yum install openssl-devel -y 
sudo yum install zlib-devel -y

2.安装setuptools

1
2
3
4
5
6
7
8
9
10
# 下载安装文件
wget --no-check-certificate https://pypi.python.org/packages/source/s/setuptools/setuptools-19.6.tar.gz#md5=c607dd118eae682c44ed146367a17e26

# 解压
tar -zxvf setuptools-19.6.tar.gz
cd setuptools-19.6

# 执行安装
sudo python3 setup.py build
sudo python3 setup.py install

3.安装pip3

1
2
3
4
5
6
7
8
9
10
# 下载安装文件
wget --no-check-certificate https://pypi.python.org/packages/source/p/pip/pip-20.2.2.tar.gz#md5=3a73c4188f8dbad6a1e6f6d44d117eeb

# 解压
tar -zxvf pip-20.2.2.tar.gz
cd pip-20.2.2

# 执行安装
python3 setup.py build
sudo python3 setup.py install

4.测试 安装完成后,输入pip3 -V,打印:

1
pip 20.2.2 from /usr/local/python3/lib/python3.7/site-packages/pip (python 3.7)

则说明安装成功,可以正常安装需要的第三方库了,需要注意: 在使用时应该是pip3 xxx,而不是pip xxx,使之与Python2相区别。

精选文章 CentOS7 安装Nginx+MySQL

CentOS7 安装 MySQL

安装MySql

CentOS7的yum源中默认好像是没有mysql的。为了解决这个问题,我们要先下载mysql的repo源。

\1. 下载mysql的repo源

$ wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm

\2. 安装mysql-community-release-el7-5.noarch.rpm包

$ sudo rpm -ivh mysql-community-release-el7-5.noarch.rpm

安装这个包后,会获得两个mysql的yum repo源:/etc/yum.repos.d/mysql-community.repo,/etc/yum.repos.d/mysql-community-source.repo。

\3. 安装mysql

$ sudo yum install mysql-server

转载于:https://www.cnblogs.com/tchroot/p/6551523.html

安装zsh 和 oh-my-zsh

  1. 安装zsh
    yum install zsh
  2. 安装git
    yum install git
  3. 切换默认shell
    chsh -s /bin/zsh
  4. clone from GitHub
    git clone git://github.com/robbyrussell/oh-my-zsh.git ~/.oh-my-zsh
  5. 复制默认.zshrc
    cp ~/.oh-my-zsh/templates/zshrc.zsh-template ~/.zshrc

安装插件

语法高亮插件

  1. 安装
    git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting
  2. 配置
    在~/.zshrc的plugins中加入zsh-syntax-highlighting

自动补全插件

  1. 安装
    git clone git://github.com/zsh-users/zsh-autosuggestions $ZSH_CUSTOM/plugins/zsh-autosuggestions
  2. 配置
    在~/.zshrc的plugins中加入zsh-autosuggestions

  1. 打开url选择jdk1.8下载

官网地址

  1. 下载(友情提示:如果下载链接失效,那就自己换一个,或者下载到本地在上传到安装目录解压)

wget https://download.oracle.com/otn/java/jdk/8u261-b12/a4634525489241b9a9e1aa73d9e118e6/jdk-8u261-linux-x64.tar.gz?AuthParam=1595227231_c973dac5c59012a4184f32776ff66778

  1. 安装

创建安装目录
mkdir /usr/local/java/

解压至安装目录
tar -zxvf jdk-8u251-linux-x64.tar.gz -C /usr/local/java/

  1. 设置环境变量

打开文件
vim /etc/profile

在末尾添加(按i编辑)
export JAVA_HOME=/usr/local/java/jdk1.8.0_251
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

使环境变量生效
source /etc/profile

添加软链接
ln -s /usr/local/java/jdk1.8.0_251/bin/java /usr/bin/java

检查
java -version

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1.安装:
# 1. 安装docker-compose,需要预先安装好Docker
sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
# 增加可执行权限
sudo chmod +x /usr/local/bin/docker-compose
# 2. 创建相关文件夹
mkdir -p /opt/zookeeper && mkdir -p /opt/kafka
# 3. 准备配置文件
echo '
version: '3.7'
services:
zookeeper:
image: wurstmeister/zookeeper
volumes:
- /opt/zookeeper/data:/data
container_name: zookeeper
mem_limit: 1024M
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
restart: always
kafka_node1:
image: wurstmeister/kafka
container_name: kafka_node1
mem_limit: 1024M
depends_on:
- zookeeper
ports:
- 9092:9092
volumes:
- /opt/kafka/data:/kafka
environment:
KAFKA_CREATE_TOPICS: "test"
KAFKA_BROKER_NO: 0
KAFKA_LISTENERS: PLAINTEXT://kafka_node1:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${kafka_service_public_ip}:${kafka_service_public_port}
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"
restart: always
kafka_manager:
image: hlebalbau/kafka-manager:stable
ports:
- 9000:9000
environment:
ZK_HOSTS: "zookeeper:2181"
depends_on:
- zookeeper
- kafka_node1
restart: always' > /etc/kafka/kafka.yml
# 3. 启动容器服务
docker-compose -f /etc/kafka/kafka.yml up -d

PS需要注意env配置信息

docker-compose的配置文件中变量配置方法

https://docs.docker.com/compose/environment-variables/

需要在配置文件所在目录创建 .env文件,

文件中添加变量既可。

2 使用Kafka-Mgr管理集群

访问服务所在主机的9000端口,按如下图所示创建集群:

img

  • 若出现 Yikes! KeeperErrorCode = Unimplemented for /kafka-manager/mutex Try again. 错误,参考此issue解决,具体步骤如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # docker exec -it zookeeper bash
    root@98747a9eac65:/zookeeper-3.4.14# ./bin/zkCli.sh
    [zk: localhost:2181(CONNECTED) 2] ls /kafka-manager
    [configs, deleteClusters, clusters]
    [zk: localhost:2181(CONNECTED) 3] create /kafka-manager/mutex ""
    Created /kafka-manager/mutex
    [zk: localhost:2181(CONNECTED) 5] create /kafka-manager/mutex/locks ""
    Created /kafka-manager/mutex/locks
    [zk: localhost:2181(CONNECTED) 6] create /kafka-manager/mutex/leases ""
    Created /kafka-manager/mutex/leases

参考文档:

https://kafka.apache.org/0110/documentation.html#brokerconfigs

https://github.com/wurstmeister/kafka-docker/issues/233