用Python编写WEB服务器压力测试工

发布时间:2019-08-11 11:27:12编辑:auto阅读(1368)

    前言

    最近在编写一个简单的WEB服务器,一个日常的工作就是测试服务器的性能,试用了MS的Web Application Stress,发现它居然不支持除80以外端口的测试,其他的如Loadrunner 太贵而且太大,试用版只支持10个并发用户,我Google到了100个并发用户的许可想试用一下,不过没有安装成功。想想这种压力测试实际上没啥技术含量,就自己用Python来编写了小段测试代码。

    使用Python的原因

    毫无疑问,编写这样的代码使用Python最合适,使用C/C++编写有点小题大做,使用C#编写编译又很麻烦,我是使用Editplus来写代码的,因为要考虑做不同的测试,只能边写边调整,使用Python,下载一个Python的加亮文件,设置User Tool 1 到 Python,运行的时候只需要按Ctrl+1,着实方便的很。

    压力测试的实现

    压力测试是通过模拟对WEB服务器的访问,进行记录响应时间,计算每秒处理数,记录上传下载的字节数。一般来说,一台测试机器上视机器的性能,发起 50~200的连接,基本就差不多了。考虑到测试机器的负载,一般使用多线程来完成多个WEB请求,幸运的是,Python对所有的这些支持的相当完善。以下是测试的代码

    # code by 李嘉
    # 禁止任何商业目的的转载
    # 不对因使用代码产生任何后果负任何责任
    # 转载请保留所有声明
    import threading, time, httplib, random
    # 需要测试的 url 列表,每一次的访问,我们随机取一个
    urls = [
    	"/test?page=",
    	"/test2?orderby=a&page=",
    	"/test2?orderby=d&page=",
    ]
    MAX_PAGE = 10000
    SERVER_NAME = "192.168.0.64:80"
    TEST_COUNT = 10000
    # 创建一个 threading.Thread 的派生类
    class RequestThread(threading.Thread):
    	# 构造函数
    	def __init__(self, thread_name):
    		threading.Thread.__init__(self)
    		self.test_count = 0
    
    	# 线程运行的入口函数
    	def run(self):
    		# 不直接把代码写在run里面是因为也许我们还要做其他形式的测试
    		i = 0
    		while i < TEST_COUNT:
    			self.test_performace()
    			i += 1
    		#self.test_other_things()
    
    	def test_performace(self):
    		conn = httplib.HTTPConnection(SERVER_NAME)
    		# 模拟 Keep-Alive 的访问, HTTP 1.1
    		for i in range(0, random.randint(0, 100)):
    			# 构造一个 url,提供随机参数的能力
    			url = urls[random.randint(0, len(urls) - 1)];
    			url += str(random.randint(0, MAX_PAGE))
    			# 这就连接到服务器上去
    			#print url
    			try:
    				conn.request("GET", url)
    				rsps = conn.getresponse()
    				if rsps.status == 200:
    					# 读取返回的数据
    					data = rsps.read()
    				self.test_count += 1
    			except:
    				continue
    			
    		conn.close()
    		
    # main 代码开始
    
    # 开始的时间
    start_time = time.time()
    threads = []
    # 并发的线程数
    thread_count = 100 
    
    i = 0
    while i < thread_count:
    	t = RequestThread("thread" + str(i))
    	threads.append(t)
    	t.start()
    	i += 1
    # 接受统计的命令
    word = ""
    while True:
    	word = raw_input("cmd:")
    	if word == "s":
    		time_span = time.time() - start_time
    		all_count = 0
    		for t in threads:
    			all_count += t.test_count
    		print "%s Request/Second" % str(all_count / time_span)
    	elif word == "e":
    		# 准备退出 其实 X 掉 窗口更加容易,没什么浪费的资源
    		TEST_COUNT = 0
    		for t in threads:
    			t.join(0)
    		break	
    

关键字