目录

【Python】使用Beautiful Soup等三种方式定制Jmeter测试脚本

背景介绍

我们在做性能调优时,时常需要根据实际压测的情况,调整线程组的参数,比如循环次数,线程数,所有线程启动的时间等。 如果是在一台Linux机器上,就免不了在本机打开图形页面修改,然后最后传递到压测机上面的过程,所有为了解决这个业务痛点 ,使用Python写了一个能直接修改Jmeter基础压测参数的脚本,能修改jmx脚本的线程组数、循环次数、线程组全部启动需要花的时间。

实现思路

刚开始准备写这个脚本的时候,想了两个思路:

把脚本数据读出,使用正则表达式(re库)匹配关键数据进行修改

优点:可以快速的改写数据 缺点:无法进行区块的修改

把脚本数据读出,使用BeautifulSoup的xml解析功能解析后修改

注:我们的Jmx脚本其实就是一个标准格式的xml

优点: 能快速的查找元素并进行修改 缺点: 需要熟悉BeautifulSoup的用法

通过Beautiful Soup

Beautiful Soup

Beautiful Soup 是一个可以从HTML或XML文件中提取数据的Python库.我们使用BeautifulSoup解析xml或者html的时候,能够得到一个 BeautifulSoup 的对象,我们可以通过操作这个对象来完成原始数据的结构化数据。具体的使用可以参照这份文档

具体实现

主要使用了bs4的soup.findself.soup.find_all功能。结化或数据的修改如loops.string = num

值得注意的是,find_all支持正则匹配,甚至如果没有合适过滤器,那么还可以定义一个方法,方法只接受一个元素参数。

修改后的脚本将以"T{}L{}R{}-{}_{}.jmx".format(thread_num, loop_num, ramp_time, self.src_script, self.get_time()) 的形式保存,具体封装如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import time
import os
from bs4 import BeautifulSoup

class OpJmx:
    def __init__(self, file_name):
        self.src_script = self._split_filename(file_name)
        with open(file_name, "r") as f:
            data = f.read()
        self.soup = BeautifulSoup(data, "xml")

    @staticmethod
    def _split_filename(filename):
        """
        新生成的文件兼容传入相对路径及文件名称
        :param filename:
        :return:
        """
        relative = filename.split("/")
        return relative[len(relative)-1].split(".jmx")[0]

    def _theard_num(self):
        """
        :return: 线程数据对象
        """
        return self.soup.find("stringProp", {"name": {"ThreadGroup.num_threads"}})

    def _ramp_time(self):
        """
        :return: 启动所有线程时间配置对象
        """
        return self.soup.find("stringProp", {"name": {"ThreadGroup.ramp_time"}})

    def _bean_shell(self):
        """
        :return:  bean_shell对象
        """
        return self.soup.find("stringProp", {"name": {"BeanShellSampler.query"}})

    def _paths(self):
        """
        :return: 请求路径信息对象
        """
        return self.soup.find_all("stringProp", {"name": {"HTTPSampler.path"}})

    def _methods(self):
        """
        :return: 请求方法对象
        """
        return self.soup.find_all("stringProp", {"name": {"HTTPSampler.method"}})

    def _argument(self):
        """
        :return: post请求参数对象
        """
        # Argument.value 不唯一 通过HTTPArgument.always_encode找到
        return self.soup.find_all("boolProp", {"name": {"HTTPArgument.always_encode"}})[0].find_next()

    def _loops(self):
        """
        循环次数,兼容forever 与具体次数
        :return: 循环次数对象
        """
        _loops = self.soup.find("stringProp", {"name": {"LoopController.loops"}})
        if _loops:
            pass
        else:
            _loops = self.soup.find("intProp", {"name": {"LoopController.loops"}})

        return _loops

    @staticmethod
    def get_time():
        return time.strftime("%Y-%m-%d@%X", time.localtime())

    def get_bean_shell(self):
        _str = self._bean_shell().string
        logger.info("bean_shell: " + _str)
        return _str

    def set_bean_shell(self, new_bean_shell):
        old_bean_shell = self._bean_shell()
        old_bean_shell.string = new_bean_shell

    def get_ramp_time(self):
        _str = self._ramp_time().string
        logger.info("ramp_time: " + _str)
        return _str

    @check_num
    def set_ramp_time(self, num):
        loops = self._ramp_time()
        loops.string = num

    def get_loops(self):
        _str = self._loops().string
        logger.info("loops: " + _str)
        return _str

    @check_num
    def set_loops(self, num):
        """
        :param num: -1 为一直循环,其他为具体循环次数
        :return:
        """
        loops = self._loops()
        loops.string = num

    def get_argument(self):
        _str = self._argument().string
        logger.info("argument: " + _str)
        return _str

    def set_argument(self, **kwargs):
        """
        设置请求参数(JSON,传入字典)
        :param kwargs:
        :return:
        """
        param = self._argument()
        param.string = str(kwargs)

    def get_thread_num(self):
        _str = self._theard_num().string
        logger.info("thread_num: " + _str)
        return _str

    @check_num
    def set_thread_num(self, num):
        """
        设置线程数信息
        :param num:
        :return:
        """
        thread_num = self._theard_num()
        thread_num.string = num
        # print(self.soup.find_all("stringProp", {"name": {"ThreadGroup.num_threads"}})[0].string)

    def mod_header(self, key, value, index=0):
        """
        修改指定header的信息,默认修改第一个值
        :param key:
        :param value:
        :param index:
        :return:
        """
        headers = self.soup.find_all("elementProp", {"elementType": {"Header"}})
        headers[index].find("stringProp", {"name": {"Header.name"}}).string = key
        headers[index].find("stringProp", {"name": {"Header.value"}}).string = value
        # for header in headers:
        #     header.find("stringProp", {"name": {"Header.name"}}).string = key
        #     header.find("stringProp", {"name": {"Header.value"}}).string = value

    def save_jmx(self):
        logger.info("参数设置完毕,开始保存数据")
        cur_path = os.path.dirname(os.path.realpath(__file__))
        thread_num = self.get_thread_num()
        loop_num = self.get_loops()
        ramp_time = self.get_ramp_time()

        script_name = "T{}L{}R{}-{}_{}.jmx".format(thread_num, loop_num, ramp_time, self.src_script, self.get_time())
        script_path = os.path.join(cur_path, '..', 'script')

        if not os.path.exists(script_path):
            os.mkdir(script_path)

        script_location = os.path.join(script_path, script_name)
        logger.info("测试脚本已保存于 {}".format(script_location))
        with open(script_location, "w") as f:
            f.write(str(self.soup))

        return script_name
if __name__ == '__main__':
    jmx = OpJmx("templates/template.jmx")
    argvs = sys.argv
    len_argvs = len(argvs) - 1
    if len_argvs == 0:
        pass
    elif len_argvs == 1:
        jmx.set_thread_num(argvs[1])
    elif len_argvs == 2:
        jmx.set_thread_num(argvs[1])
        jmx.set_loops(argvs[2])
    elif len_argvs == 3:
        jmx.set_thread_num(argvs[1])
        jmx.set_loops(argvs[2])
        jmx.set_ramp_time(argvs[3])
    jmx.save_jmx()

未完待续…

使用string.Template字符替换

如果只是简单的字符串替换,使用 format 或者 %s 也能完成,选择使用string.Template的原因是string.Template可以自动化匹配规则,且能修改操作符, 而不管是fstring还是format都是用的{}来进行关键字的定位,{}在jmx脚本中本身就存在特定的意义。

思路:

  • 修改jmx脚本中的关键数据,使用特定操作符
  • 定义相关字典,使用safe_substitute进行赋值

具体实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#! /usr/bin/python
# coding:utf-8 
""" 
@author:Bingo.he 
@file: str_temp.py 
@time: 2019/08/20 
"""
import string

# with open("template_str.jmx", "r") as f:
#     data = f.read()
set_value = {
    "num_threads": 10,
    "loops": 1011,
    "ramp_time": 10
}
str_temp = """
  <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
    <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
    <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
      <boolProp name="LoopController.continue_forever">false</boolProp>
      <stringProp name="LoopController.loops">%loops</stringProp>
    </elementProp>
    <stringProp name="ThreadGroup.num_threads">%num_threads</stringProp>
    <stringProp name="ThreadGroup.ramp_time">%ramp_time</stringProp>
    <boolProp name="ThreadGroup.scheduler">false</boolProp>
    <stringProp name="ThreadGroup.duration"></stringProp>
    <stringProp name="ThreadGroup.delay"></stringProp>
  </ThreadGroup>
"""


class MyTemplate(string.Template):
    # 修改操作符为"%"
    delimiter = '%'
    # 修改匹配规则(正则)
    # idpattern = '[a-z]+_[a-z]+'


t = MyTemplate(str_temp)

print(t.safe_substitute(set_value))

输出:

1
2
3
4
5
6
...
  <stringProp name="LoopController.loops">1011</stringProp>
</elementProp>
<stringProp name="ThreadGroup.num_threads">101</stringProp>
<stringProp name="ThreadGroup.ramp_time">10</stringProp>
...

使用re.sub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
str_temp = """
  <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
    <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
    <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
      <boolProp name="LoopController.continue_forever">false</boolProp>
      <stringProp name="LoopController.loops">$loops</stringProp>
    </elementProp>
    <stringProp name="ThreadGroup.num_threads">$num_threads</stringProp>
    <stringProp name="ThreadGroup.ramp_time">$ramp_time</stringProp>
    <boolProp name="ThreadGroup.scheduler">false</boolProp>
    <stringProp name="ThreadGroup.duration"></stringProp>
    <stringProp name="ThreadGroup.delay"></stringProp>
  </ThreadGroup>
"""

str_l = re.sub(r"\$loops", "101", str_temp)
str_t = re.sub(r"\$num_threads", "102", str_l)
str_r = re.sub(r"\$ramp_time", "103", str_t)

print(str_r)

输出:

1
2
3
4
5
6
7
···
      <boolProp name="LoopController.continue_forever">false</boolProp>
      <stringProp name="LoopController.loops">101</stringProp>
    </elementProp>
    <stringProp name="ThreadGroup.num_threads">102</stringProp>
    <stringProp name="ThreadGroup.ramp_time">103</stringProp>
···

延展

相信大家也注意到了,我们每替换一个参数都需要调用一次re.sub,而且要将上一次调用的输出作为下一次的输入,像极了递归调用。但是我们今天不介绍递归改写的方法,而是使用闭包的方式,具体的例子如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import re


def multiple_replace(text, adict):
    rx = re.compile('|'.join(map(re.escape, adict)))

    def one_xlat(match):
        return adict[match.group(0)]

    return rx.sub(one_xlat, text)  # 每遇到一次匹配就会调用回调函数


# 把key做成了 |分割的内容,也就是正则表达式的OR
map1 = {'1': '2', '3': '4', '5': '6'}
_str = '113355'
print(multiple_replace(_str, map1))