2022-10-24
< view all posts介绍使用kafka bin目录中自带的 kafka-producer-perf-test.sh 脚本进行压测的方法。类似的方法也可以适用于对其它的Kafka下游系统进行压测。
首先需要准备用于压测的交易数据:
数据应该保存在文本文件中,每行是一条json记录。这里需要注意的是 kafka-producer-perf-test.sh 这个脚本在执行时会将所有数据读进内存,如果用于测试的交易数据量较大(例如1000w条),在读取数据这一步,测试程序就会出现OOM的情况。因此,在生成测试数据时,可以对数据进行分块,之后对不同的分块依次运行测试脚本。
*另一个方法是编辑 kafka-producer-perf-test.sh 这个脚本的内容,它在内部配置了 KAFKA_HEAP_OPTS 这个环境变量。可以修改它的值。
可以使用一个python脚本来生成测试数据,一些实用的写法:
1、生成随机的日期:
import random from datetime import datetime, timedelta initial_time = datetime(year=2022, month=6, day=1) rand_time = initial_time + timedelta(seconds=random.randint(1, 60 * 400000)) time_str = rand_time.strftime("%Y%m%d%H%M%S")
2、等概率地取值:
source_id = random.choice(["01","02","03","04"])
3、非等概率取值:
resp_code = random.choices(["00","01", "02"], weights=(90,8,2))
4、生成随机的ASCII字符串:
import string msg = "".join([random.choice(string.ascii_letters) for x in range(16)])
类似的,将 string.ascii_letters 替换为 string.octdigits 和 string.hexdigits 可以得到随机的十/十六进制数字字符串。
5、用Memory Map读数据池:例如预先准备了某些测试数据,比如 id 列表,保存在文件中。如果非常大,读取到内存中并不是很方便的情况,可以使用Memory Map实现随机地读取任意行。具体方法可以参考《Python对大数据文件的读取》这篇笔记当中相应部分的内容。
使用以下命令,替换加粗部分的内容即可:
nohup /.../kafka_x.x.x/bin/kafka-producer-perf-test.sh --topic xxx_topic --payload-file data.txt --num-records 34560000 --throughput 200 --producer-props bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092 key.serializer=org.apache.kaf ka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer &
监控整机性能,可以使用 sar 命令,非常方便。如果机器上没有,可以用 yum install sysstat 安装。
命令格式: sar -u -r 间隔(秒) 轮次
例如: nohup sar -u -r 1 172800 > sys_monitor.txt & ,就会每秒1次,记录机器的cpu和内存使用情况,共记录172800次,输出到 sys_monitor.txt 文件当中。
监控单个进程,可以使用我写的这个python脚本(Gist地址):
# Monitors CPU and memory usage percentage of a given thread. # Output file named under the pattern "{pid}_{ctime}_log.csv" # Python3 # usage: # `python ps_monitor.py pid interval limit` # pid: pid of the thread to monitor # interval: time interval between output records, in seconds # limit: stops recording once number of total records reaches the limit # # example: # `python ps_monitor.py 18976 1 20` import time import psutil import sys def pslog(pid, interval, limit): filename = str(pid) + '_' + time.ctime() + '_log.csv' with open(filename, 'w') as f: f.write('time,CPU_percent,memory_percent\n') process = psutil.Process(pid) counter = 0 while process.is_running() and counter < limit: f.write(time.ctime() + ',' + str(process.cpu_percent()) + ',' + str(process.memory_percent()) + '\n' ) counter += 1 time.sleep(interval) pslog(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]))
保存为 ps_monitor.py 文件后,运行: python ps_monitor.py 进程号 间隔(秒) 轮次。例如: python ps_monitor.py 18976 1 20 即可。