Kafka-Flink性能压测方法

2022-10-24

< view all posts

数据准备

介绍使用kafka bin目录中自带的 kafka-producer-perf-test.sh 脚本进行压测的方法。类似的方法也可以适用于对其它的Kafka下游系统进行压测。

首先需要准备用于压测的交易数据:

数据应该保存在文本文件中,每行是一条json记录。这里需要注意的是 kafka-producer-perf-test.sh 这个脚本在执行时会将所有数据读进内存,如果用于测试的交易数据量较大(例如1000w条),在读取数据这一步,测试程序就会出现OOM的情况。因此,在生成测试数据时,可以对数据进行分块,之后对不同的分块依次运行测试脚本。

*另一个方法是编辑 kafka-producer-perf-test.sh 这个脚本的内容,它在内部配置了 KAFKA_HEAP_OPTS 这个环境变量。可以修改它的值。

可以使用一个python脚本来生成测试数据,一些实用的写法:

1、生成随机的日期:

import random
from datetime import datetime, timedelta
initial_time = datetime(year=2022, month=6, day=1)
rand_time = initial_time + timedelta(seconds=random.randint(1, 60 * 400000))
time_str = rand_time.strftime("%Y%m%d%H%M%S")

2、等概率地取值:

source_id = random.choice(["01","02","03","04"])

3、非等概率取值:

resp_code = random.choices(["00","01", "02"], weights=(90,8,2))

4、生成随机的ASCII字符串:

import string
msg = "".join([random.choice(string.ascii_letters) for x in range(16)])

类似的,将 string.ascii_letters 替换为 string.octdigitsstring.hexdigits 可以得到随机的十/十六进制数字字符串。

5、用Memory Map读数据池:例如预先准备了某些测试数据,比如 id 列表,保存在文件中。如果非常大,读取到内存中并不是很方便的情况,可以使用Memory Map实现随机地读取任意行。具体方法可以参考《Python对大数据文件的读取》这篇笔记当中相应部分的内容。

执行测试

使用以下命令,替换加粗部分的内容即可:

nohup /.../kafka_x.x.x/bin/kafka-producer-perf-test.sh --topic xxx_topic --payload-file data.txt --num-records 34560000 --throughput 200 --producer-props bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092 key.serializer=org.apache.kaf
ka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer &

性能监控

监控整机性能,可以使用 sar 命令,非常方便。如果机器上没有,可以用 yum install sysstat 安装。

命令格式: sar -u -r 间隔(秒) 轮次

例如: nohup sar -u -r 1 172800 > sys_monitor.txt & ,就会每秒1次,记录机器的cpu和内存使用情况,共记录172800次,输出到 sys_monitor.txt 文件当中。

监控单个进程,可以使用我写的这个python脚本(Gist地址):

# Monitors CPU and memory usage percentage of a given thread.
# Output file named under the pattern "{pid}_{ctime}_log.csv"
# Python3

# usage:
# `python ps_monitor.py pid interval limit`
#     pid: pid of the thread to monitor
#     interval: time interval between output records, in seconds
#     limit: stops recording once number of total records reaches the limit
#
# example:
# `python ps_monitor.py 18976 1 20`

import time
import psutil
import sys

def pslog(pid, interval, limit):
    filename = str(pid) + '_' + time.ctime() + '_log.csv'
    with open(filename, 'w') as f:
        f.write('time,CPU_percent,memory_percent\n')
        process = psutil.Process(pid)
        counter = 0
        while process.is_running() and counter < limit:
            f.write(time.ctime() + ',' +
                str(process.cpu_percent()) + ',' +
                str(process.memory_percent()) + '\n'
            )
            counter += 1
            time.sleep(interval)

pslog(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]))

保存为 ps_monitor.py 文件后,运行: python ps_monitor.py 进程号 间隔(秒) 轮次。例如: python ps_monitor.py 18976 1 20 即可。