在 上一节 (015、【前程贷—简化版,实战 08】 用pymysql库,封装 handler_mysql 操作数据库 ) 的基础上,增加 验证手机号是否被注册过?如果注册过再继续生成随机手机号码,直到得到的手机号码是未被注册的。
如何实现成功注册呢 :
步骤1、用faker库生成一个随机手机号码 ;
步骤2、用这个手机号码去数据库比对,是否注册过;如果注册过再生成一个随机手机号码 ;
步骤3、如果未被注册过,用生成的随机手机号码 (如:13123456789) 替换,excel表格中的标记字符 #phone# ;
步骤4、替换成功后,得到一个临时字典,用这个临时字典做为数据发送requests请求 ;
1、项目层级结构如下 :
2、excel表格设计如下,注意 #phone# :
3、各代码如下,标记红色部分为主要修改点 :
a、config.ini 代码如下:
# 定义日志相关的配置 # name表示自定义日志搜集器的名字; # level表示日志级别; # file_name 表示生成的日志名字 # show_stream_handler表示是否在控制台输出日志。False表示不在控制台显示。 # when,表示按H(小时)、D(天)、M(分钟)、S(秒)生成日志 ;注意字母是大写 ; [log] name=future_logging_collector level=INFO file_name=future_loan_test.log show_stream_handler=False when=H
b、mysql_ini.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 13:20 # Project: Future_Loan_day15 # Module: mysql_ini.py # 此配置文件用来配置链接mysql数据库 # key已经和代码关联了,不能更改key名; MYSQL_INI = { "host": "api.lemonban.com", "port": 3306, "user": "future", "password": "123456", "database": "futureloan", "charset": "utf8" }
c、test_1_register.py 代码如下(有代码修改):
import os import json import jsonpath import pytest from tools.handle_phone import HandlePhone from tools.handler_request import HandlerRequests from tools.handler_assert_list import HandlerAssertList from tools.handler_excel import HandlerExcel from tools.handler_logging import handler_logger # 一、准备 数据 cases # A、用os.path找到 xlsx 文件 file_dir = os.path.dirname(os.path.realpath('__file__')) base_dir = os.path.dirname(file_dir) print(f'根目录:{base_dir}') file_name = os.path.join(base_dir, 'test_datas', 'test_register_cases.xlsx') print(f'excel表格的路径:{file_name}') # B、调用common——>excel_handler.py 中已封装好的ExcelHandler类来操作excel表格 # # 1、打开xlsx表格;2、根据表单名字获取表格数据;3、读取数据; excel_handler = HandlerExcel(file_name) excel_handler.select_sheet_by_name('register_case') all_cases = excel_handler.read_all_rows_data() # print(all_cases) # handler_logger.info(f'=====all_cases=======:{all_cases}') mrq = HandlerRequests() add_assert = HandlerAssertList() hp = HandlePhone() class TestRegister: # 三、参数化测试用例 # pytest.mark.parametrize("item", cases) 中的 "item" 必须 和 def test_my_requests(item): 中的 item 名字一样 @pytest.mark.parametrize("item", all_cases) def test_my_register(self, item): # handler_logger.info('=====注册接口测试=======') handler_logger.info(f'"#phone#"未替换的前的数据:{item["req_data"]}=======') # 1、替换占位符 # item 的值为:一行的数据,如下: """ { 'case_id': 'test_register_002', 'case_name': '注册成功,普通用户', 'method': 'POST', 'url': 'http://api.lemonban.com/futureloan/member/register', 'req_data': '{"mobile_phone": "#phone#","pwd": "12345678","reg_name":"Sky","type":1}', 'assert_response_value_list': '[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"OK","type":"eq"}]', 'actual_results': None } """ # 1、用随机生成的,未被注册过的手机号码,把 标记 "#phone#"的字符串,替换;
# a、查询数据库,并得到一个未被注册的手机号码 new_phone = hp.get_phone() # handler_logger.info(f'=====new_phone:{new_phone}=======') # handler_logger.info(f'=====对比结果:{item["req_data"].find("#phone#") != -1}=======')
# b、判断字符串中是否含有 #phone#,如果有把它替换成 刚随机生成的手机号码 if item["req_data"] and item["req_data"].find("#phone#") != -1: item['req_data'] = item['req_data'].replace("#phone#", new_phone) if item["assert_response_value_list"] and item["assert_response_value_list"].find("#phone#") != -1: item["assert_response_value_list"] = item["assert_response_value_list"].replace("#phone#", new_phone) # 2、把json格式的字符串转换成 字典 temp_req_data_dict = json.loads(item['req_data']) handler_logger.info(f'用随机手机号码替换"#phone#"得到的数据:{temp_req_data_dict}=======') # 3、发起请求 resp = mrq.send_reuqests(item['method'], item['url'], temp_req_data_dict) print(resp.json()) # 4、断言 add_assert.assert_response_value(item["assert_response_value_list"], resp.json())
d、handle_phone.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com from faker import Faker from tools.handler_mysql import handler_mysql from tools.handler_logging import handler_logger class HandlePhone: def __init__(self): self.fk = Faker(locale='zh-cn') # 如果系统对手机号前缀有要求的,要做前缀校验 def __faker_phone(self): phone = self.fk.phone_number() handler_logger.info(f'随机生成的手机号为:{phone}') return phone # 获取数据库查询结果 def __select_phone(self, phone): sql = "select * from member where mobile_phone = '{}'".format(phone) select_phone_result = handler_mysql.get_db_all_data(sql=sql) return select_phone_result # 获取未注册的手机号 # 1、生成一个随机手机号码 # 2、用生成的手机号码去数据库查询,如果查询结果大于1行说明已经存在,被注册过; def get_phone(self): while True: phone = self.__faker_phone() # 随机生成手机号 select_phone_result = self.__select_phone(phone) # 拿到数据库执行结果 if len(select_phone_result) > 0: continue else: return phone if __name__ == '__main__': cl = HandlePhone() result = cl.get_phone() print(result)
e、handler_assert_list.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 16:41 # Project: Future_Loan_day15 # Module: handler_assert_list.py import jsonpath import ast # 封装 assert_list列 添加断言 class HandlerAssertList: def assert_response_value(self, assert_response_value_list, response_dict): """ 根据响应的值断言 :param assert_response_value_list: 从excel表格中获取到的断言字符串,比如:'[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"账号已存在","type":"eq"}]' :param response_dict: 发起请求后得到的响应结果 :return: """ # 把字符串转换成python列表 # 比eval安全一点,转换成列表。eval去掉最外层引号后还会自动计算,literal_eval仅去掉最外层引号; check_list = ast.literal_eval(assert_response_value_list) # print(check_list) # 每一个单元格所有断言的比对结果存放在check_res check_res = [] for check in check_list: # 通过jsonpath表达式,从响应结果中拿到实际结果 actual = jsonpath.jsonpath(response_dict, check["expr"]) if isinstance(actual, list): actual = actual[0] # 与实际结果比对 if check["type"] == "eq": # print(f' 实际比对结果:{actual == check["expected"]}') check_res.append(actual == check["expected"]) # print(f'所有断言结果:{check_res}') # 如果 断言列表中有False,抛出 断言异常 if False in check_res: raise AssertionError if __name__ == '__main__': # 测试代码 assert_response_value_list = '[{"expr":"$.code","expected":0,"type":"eq"},' '{"expr":"$.msg","expected":"OK","type":"eq"}]' response_dict = { "code": 0, "msg": "OK", "data": { "id": 123660458, "reg_name": "Sky", "mobile_phone": "13321886699" }, "copyright": "Copyright 柠檬班 © 2017-2020 湖南省零檬信息技术有限公司 All Rights Reserved" } add_assert = HandlerAssertList() check_res = add_assert.assert_response_value(assert_response_value_list, response_dict)
f、handler_conf.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 17:58 # Project: Future_Loan_day15 # Module: handler_conf.py from configparser import ConfigParser class HandlerConf(ConfigParser): def __init__(self, file_name): super().__init__() self.read(file_name, encoding='utf-8')
g、handler_excel.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 16:34 # Project: Future_Loan_day15 # Module: handler_excel.py import openpyxl from tools.handler_logging import handler_logger # 封装一个xlsx表格操作类 class HandlerExcel: # 操作一个excel表格: # 第一步:打开工作簿 # 第二步:选取表单 # 第三步:读取数据 # 第四步:关闭打开的工作簿 def __init__(self, xlsx_file_path: str): """ 传入一个xlsx文件路径,用load_workbook()方法加载,如果文件加载不成功,抛出异常。如果成功,打开一个工作簿。 :param xlsx_file_path: xlsx文件路径 """ try: self.wb = openpyxl.load_workbook(xlsx_file_path) except FileNotFoundError as ffe: # print('打开文件失败') handler_logger.error(ffe) raise # 不确定打开的是哪个表单 self.sh = None def close_workbook(self): """ 关闭当前打开的工作簿 :return: """ self.wb.close() def select_sheet_by_name(self, sheet_name: str): """ 根据传入的工作表的名字,打开工作表。 :param sheet_name: 作表的名字 """ self.sh = self.wb[f'{sheet_name}'] def read_all_rows_data(self): """ 从选定的表单当中,第一行作为key. 将后面的每一行数据,与第一行拼接成一个字典数据,作为一条测试用例数据。 将所有测试用例数据,添加到一个列表当中。 :return: 测试用例数据列表 """ # 获取表单的所有行,即获取表单的所有数据 sheet_all_rows = list(self.sh.values) # 把第一行作为数据的keys keys = sheet_all_rows[0] # print(keys) # 定义 cases_list 存放测试用例 cases_list = [] # 以下代码功能:excel表单第2行开始的每一行测试数据,与第一行的keys拼接成一个字典。 for single_row in sheet_all_rows[1:]: case_dict = dict(zip(keys, single_row)) cases_list.append(case_dict) return cases_list if __name__ == '__main__': eh = HandlerExcel(r'D:SkyWorkSpaceWorkSpaceAPI_testlmFuture_Loan' r'Future_Loan_day16 est_datas est_register_cases.xlsx') eh.select_sheet_by_name('register_case') print(eh.read_all_rows_data()) eh.close_workbook()
h、handler_logging.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com import logging # 自定义一个日志模块 import os # 导入 ConfigParser 类 ,用来操作 config.ini 文件 ; import time from configparser import ConfigParser from logging import Logger from logging import handlers class HandlerLogger(Logger): def __init__(self): # 一、用 ConfigParser类 来操作 config.ini 配置文件 ; # 实例化一个 ConfigParser ; conf = ConfigParser() # 1、获取config.ini文件 common_dir = os.path.dirname(os.path.realpath(__file__)) base_dir = os.path.dirname(common_dir) config_ini_file = os.path.join(base_dir, 'conf', 'config.ini') # 2、从配置文件获取值 conf.read(config_ini_file, encoding='utf-8') logger_name = conf.get('log', 'name') level = conf.get('log', 'level') file_name = conf.get('log', 'file_name') show_stream_handler = conf.get('log', 'show_stream_handler') when = conf.get('log', 'when') # 二、设置自定义日志搜集器名字、设置日志级别; super().__init__(logger_name, level) # 三、定义日志输出格式, 使用Formatter类实例化一个日志格式类; fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s, %(pathname)s,line=%(lineno)d' # fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s,line=%(lineno)d' formatter = logging.Formatter(fmt) # 四A、日志默认输出到控制台,如果设置为False,日志将不输出到控制台; if show_stream_handler == 'True': stream_handler = logging.StreamHandler() # 设置渠道当中的日志格式 stream_handler.setFormatter(formatter) # 将渠道与实例日志搜集器绑定 self.addHandler(stream_handler) # 四B、把日志输出到文件file # 首先拼接存放log的file文件 logs_file = os.path.join(base_dir, 'logs', file_name) print(logs_file) if logs_file: file_handle = handlers.TimedRotatingFileHandler(filename=logs_file, when=when, encoding='utf-8', interval=1, backupCount=5) # 设置渠道当中的日志格式 file_handle.setFormatter(formatter) # 将渠道与实例日志搜集器绑定 self.addHandler(file_handle) # 生成一个 handler_logger 实例,在其他所有模块中导入该模块时,共用这一个日志搜集实例。handler_logger 类似于 全局变量 # 日志搜集是典型的单列设计模式 (单实例模式) 。 handler_logger = HandlerLogger() if __name__ == '__main__': handler_logger = HandlerLogger() for i in range(10): time.sleep(1) handler_logger.debug('=====debug=====') handler_logger.info('=====info=====') handler_logger.warning('=====warning=====') handler_logger.error('=====error=====')
i、handler_mysql.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 13:18 # Project: Future_Loan_day15 # Module: handler_mysql.py import pymysql from conf.mysql_ini import MYSQL_INI # 封装操作Mysql数据库类 class HandleMysql: def __init__(self): """ 1、初始化建立连接,创建数据库连接 charset='utf8', 注意,不是utf-8 哦 返回数据格式控制: cursorclass=pymysql.cursors.DictCursor 加上这个表示返回字典格式的数据;不加的话,以元组的形式返回; """ self.connection = pymysql.connect( host=MYSQL_INI['host'], port=MYSQL_INI['port'], user=MYSQL_INI['user'], password=MYSQL_INI['password'], database=MYSQL_INI['database'], charset=MYSQL_INI['charset'], cursorclass=pymysql.cursors.DictCursor) # 2、创建游标 self.cur = self.connection.cursor() # 获取 查询得到的行数(数量) def get_count(self, sql): count = self.cur.execute(sql) return count # 获取一条数据,一般都是最前面的那条数据 def get_db_one_data(self, sql): self.cur.execute(sql) return self.cur.fetchone() # 获取全部数据 def get_db_all_data(self, sql): self.cur.execute(sql) return self.cur.fetchall() # 关闭数据库连接 def close(self): self.cur.close() self.connection.close() # 使用单例模式,后续导入数据库就只导入 handler_mysql handler_mysql = HandleMysql() if __name__ == '__main__': # 1、建立链接 handler_mysql = HandleMysql() # 2、执行 sql 语句 # 先在 sql客户端 (Navicat Premium 12免安装) 执行以下sql语句,查询结果 phone = 18837906872 sql_str_2 = f"select * from member where mobile_phone='{phone}'" # 方式二 # 执行sql语句 results = handler_mysql.get_db_one_data(sql_str_2) print(results)
j、handler_request.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com import requests from tools.handler_logging import handler_logger class HandlerRequests: def __init__(self): """ 初始化方法,初始化请求头; """ self.headers = {"X-Lemonban-Media-Type": "lemonban.v2"} handler_logger.info(f'请求头为:{self.headers}') # 方法 post/put... json=xxx, get方法用 params=xxx def send_reuqests(self, method, url, req_data, token=None): """ 调用requests库里面的方法去发起请求,并得到响应结果; :param url: 接口url :param method: 请求方法,get,psot :param req_data: 请求数据 :param token: 如果有token,添加token """ handler_logger.info(f'请求方法为:{method}') handler_logger.info(f'请求url为:{url}') handler_logger.info(f'请求数据为:{req_data}') # 如果有token,添加token self.__del_header(token) # 注意 request() 方法是不带s的,requests库是带s的; if method.upper() == "GET": resp = requests.request(method, url, params=req_data, headers=self.headers) return resp if method.upper() == "POST": # 为了便于学习,简单的认为就是用 json 格式传; resp = requests.request(method, url, json=req_data, headers=self.headers) return resp def __del_header(self, token=None): """ 如果有token,添加token处理 :param token: 如果有token,添加token处理 """ if token: self.headers["Authorization"] = f"Bearer {token}" if __name__ == '__main__': handler_requests = HandlerRequests()
k、操作mysql.py 代码如下:
# -*- coding:utf-8 -*- # Author: Sky # Email: 2780619724@qq.com # Time: 2021/9/4 10:02 # Project: Future_Loan_day15 # Module: 操作mysql.py import pymysql # 1、建立连接,创建数据库连接 # charset='utf8', 注意,不是utf-8 哦 # 返回数据格式控制: cursorclass=pymysql.cursors.DictCursor 加上这个表示返回字典格式的数据;不加的话,以元组的形式返回; connection = pymysql.connect(host='api.lemonban.com', port=3306, user='future', password='123456', database='futureloan', charset='utf8', cursorclass=pymysql.cursors.DictCursor) # 2、创建游标 cur = connection.cursor() # 3、执行sql语句 , 返回数据 sql_str = "select * from member where reg_name='娜娜'" # 方式一 # phone = 18837906872 # sql_str_2 = f"select * from member where mobile_phone='{phone}'" # 方式二 affected_rows = cur.execute(sql_str) # 返回影响的行数 print(f'返回影响的行数:{affected_rows}') # 4、获取查询的结果 first_data = cur.fetchone() # 获取一条数据,第一条数据;字典形式返回 ; all_data = cur.fetchall() # 获取全部数据;列表形式返回 ; two_data = cur.fetchmany(size=2) # 获取前2行数据;列表形式返回 ; print(f'获取第一条数据:{first_data}') print(f'获取全部数据:{all_data}') print(f'获取前2行数据:{two_data}') # 5、关闭数据库连接 cur.close() # 首先关闭游标 connection.close() # 关闭数据库
运行方式如下:
执行结果如下:
本文摘自 :https://www.cnblogs.com/