当前位置:首页 > IT技术 > 移动平台 > 正文

016、【前程贷—简化版,实战 09】 用 faker库 生成的随机手机号码,去注册,看是否注册过 。
2021-09-05 09:04:31

 

 

在 上一节  (015、【前程贷—简化版,实战 08】 用pymysql库,封装 handler_mysql  操作数据库 )  的基础上,增加 验证手机号是否被注册过?如果注册过再继续生成随机手机号码,直到得到的手机号码是未被注册的。

 

如何实现成功注册呢 :

步骤1、用faker库生成一个随机手机号码 ;

步骤2、用这个手机号码去数据库比对,是否注册过;如果注册过再生成一个随机手机号码 ;

步骤3、如果未被注册过,用生成的随机手机号码 (如:13123456789) 替换,excel表格中的标记字符 #phone# ;

步骤4、替换成功后,得到一个临时字典,用这个临时字典做为数据发送requests请求 ;

 

1、项目层级结构如下 :

 

2、excel表格设计如下,注意 #phone#

 

 

3、各代码如下,标记红色部分为主要修改点

a、config.ini  代码如下:

# 定义日志相关的配置
# name表示自定义日志搜集器的名字;
# level表示日志级别;
# file_name 表示生成的日志名字
# show_stream_handler表示是否在控制台输出日志。False表示不在控制台显示。
# when,表示按H(小时)、D(天)、M(分钟)、S(秒)生成日志 ;注意字母是大写 ;
[log]
name=future_logging_collector
level=INFO
file_name=future_loan_test.log
show_stream_handler=False
when=H

b、mysql_ini.py 代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 13:20
# Project: Future_Loan_day15
# Module:  mysql_ini.py


# 此配置文件用来配置链接mysql数据库
# key已经和代码关联了,不能更改key名;
MYSQL_INI = {
    "host": "api.lemonban.com",
    "port": 3306,
    "user": "future",
    "password": "123456",
    "database": "futureloan",
    "charset": "utf8"
}

c、test_1_register.py  代码如下(有代码修改)

import os
import json

import jsonpath
import pytest

from tools.handle_phone import HandlePhone
from tools.handler_request import HandlerRequests
from tools.handler_assert_list import HandlerAssertList
from tools.handler_excel import HandlerExcel
from tools.handler_logging import handler_logger

# 一、准备 数据 cases
# A、用os.path找到 xlsx 文件
file_dir = os.path.dirname(os.path.realpath('__file__'))
base_dir = os.path.dirname(file_dir)
print(f'根目录:{base_dir}')

file_name = os.path.join(base_dir, 'test_datas', 'test_register_cases.xlsx')
print(f'excel表格的路径:{file_name}')


# B、调用common——>excel_handler.py 中已封装好的ExcelHandler类来操作excel表格
# # 1、打开xlsx表格;2、根据表单名字获取表格数据;3、读取数据;
excel_handler = HandlerExcel(file_name)
excel_handler.select_sheet_by_name('register_case')
all_cases = excel_handler.read_all_rows_data()

# print(all_cases)
# handler_logger.info(f'=====all_cases=======:{all_cases}')


mrq = HandlerRequests()
add_assert = HandlerAssertList()
hp = HandlePhone()


class TestRegister:
    # 三、参数化测试用例
    # pytest.mark.parametrize("item", cases) 中的 "item" 必须 和 def test_my_requests(item): 中的 item 名字一样
    @pytest.mark.parametrize("item", all_cases)
    def test_my_register(self, item):
        # handler_logger.info('=====注册接口测试=======')
        handler_logger.info(f'"#phone#"未替换的前的数据:{item["req_data"]}=======')

        # 1、替换占位符
        # item 的值为:一行的数据,如下:
        """
            {
                'case_id': 'test_register_002',
                'case_name': '注册成功,普通用户',
                'method': 'POST',
                'url': 'http://api.lemonban.com/futureloan/member/register',
                'req_data': '{"mobile_phone": "#phone#","pwd": "12345678","reg_name":"Sky","type":1}',
                'assert_response_value_list': '[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"OK","type":"eq"}]',
                'actual_results': None
            }
        """

        # 1、用随机生成的,未被注册过的手机号码,把 标记 "#phone#"的字符串,替换;
     # a、查询数据库,并得到一个未被注册的手机号码 new_phone = hp.get_phone() # handler_logger.info(f'=====new_phone:{new_phone}=======') # handler_logger.info(f'=====对比结果:{item["req_data"].find("#phone#") != -1}=======')       

     # b、判断字符串中是否含有 #phone#,如果有把它替换成 刚随机生成的手机号码 if item["req_data"] and item["req_data"].find("#phone#") != -1: item['req_data'] = item['req_data'].replace("#phone#", new_phone) if item["assert_response_value_list"] and item["assert_response_value_list"].find("#phone#") != -1: item["assert_response_value_list"] = item["assert_response_value_list"].replace("#phone#", new_phone) # 2、把json格式的字符串转换成 字典 temp_req_data_dict = json.loads(item['req_data']) handler_logger.info(f'用随机手机号码替换"#phone#"得到的数据:{temp_req_data_dict}=======')
# 3、发起请求 resp = mrq.send_reuqests(item['method'], item['url'], temp_req_data_dict) print(resp.json()) # 4、断言 add_assert.assert_response_value(item["assert_response_value_list"], resp.json())

 

d、handle_phone.py  代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com

from faker import Faker
from tools.handler_mysql import handler_mysql
from tools.handler_logging import handler_logger


class HandlePhone:
    def __init__(self):
        self.fk = Faker(locale='zh-cn')

    # 如果系统对手机号前缀有要求的,要做前缀校验
    def __faker_phone(self):
        phone = self.fk.phone_number()
        handler_logger.info(f'随机生成的手机号为:{phone}')
        return phone

    # 获取数据库查询结果
    def __select_phone(self, phone):
        sql = "select * from member where mobile_phone = '{}'".format(phone)
        select_phone_result = handler_mysql.get_db_all_data(sql=sql)
        return select_phone_result

    # 获取未注册的手机号
    # 1、生成一个随机手机号码
    # 2、用生成的手机号码去数据库查询,如果查询结果大于1行说明已经存在,被注册过;
    def get_phone(self):
        while True:
            phone = self.__faker_phone()  # 随机生成手机号
            select_phone_result = self.__select_phone(phone)  # 拿到数据库执行结果
            if len(select_phone_result) > 0:
                continue
            else:
                return phone


if __name__ == '__main__':
    cl = HandlePhone()
    result = cl.get_phone()
    print(result)

 

e、handler_assert_list.py 代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 16:41
# Project: Future_Loan_day15
# Module:  handler_assert_list.py

import jsonpath
import ast


# 封装 assert_list列 添加断言
class HandlerAssertList:

    def assert_response_value(self, assert_response_value_list, response_dict):
        """
        根据响应的值断言
        :param assert_response_value_list: 从excel表格中获取到的断言字符串,比如:'[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"账号已存在","type":"eq"}]'
        :param response_dict: 发起请求后得到的响应结果
        :return:
        """
        # 把字符串转换成python列表
        # 比eval安全一点,转换成列表。eval去掉最外层引号后还会自动计算,literal_eval仅去掉最外层引号;
        check_list = ast.literal_eval(assert_response_value_list)
        # print(check_list)

        # 每一个单元格所有断言的比对结果存放在check_res
        check_res = []
        for check in check_list:
            # 通过jsonpath表达式,从响应结果中拿到实际结果
            actual = jsonpath.jsonpath(response_dict, check["expr"])
            if isinstance(actual, list):
                actual = actual[0]
            # 与实际结果比对
            if check["type"] == "eq":
                # print(f'
实际比对结果:{actual == check["expected"]}')
                check_res.append(actual == check["expected"])

        # print(f'所有断言结果:{check_res}')
        # 如果 断言列表中有False,抛出 断言异常
        if False in check_res:
            raise AssertionError


if __name__ == '__main__':
    # 测试代码
    assert_response_value_list = '[{"expr":"$.code","expected":0,"type":"eq"},' 
                                 '{"expr":"$.msg","expected":"OK","type":"eq"}]'
    response_dict = {
        "code": 0,
        "msg": "OK",
        "data": {
            "id": 123660458,
            "reg_name": "Sky",
            "mobile_phone": "13321886699"
        },
        "copyright": "Copyright 柠檬班 © 2017-2020 湖南省零檬信息技术有限公司 All Rights Reserved"
    }

    add_assert = HandlerAssertList()
    check_res = add_assert.assert_response_value(assert_response_value_list, response_dict)

 

f、handler_conf.py  代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 17:58
# Project: Future_Loan_day15
# Module:  handler_conf.py

from configparser import ConfigParser


class HandlerConf(ConfigParser):

    def __init__(self, file_name):
        super().__init__()
        self.read(file_name, encoding='utf-8')

 

g、handler_excel.py  代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 16:34
# Project: Future_Loan_day15
# Module:  handler_excel.py

import openpyxl

from tools.handler_logging import handler_logger


# 封装一个xlsx表格操作类
class HandlerExcel:
    # 操作一个excel表格:
    # 第一步:打开工作簿
    # 第二步:选取表单
    # 第三步:读取数据
    # 第四步:关闭打开的工作簿

    def __init__(self, xlsx_file_path: str):
        """
        传入一个xlsx文件路径,用load_workbook()方法加载,如果文件加载不成功,抛出异常。如果成功,打开一个工作簿。
        :param xlsx_file_path: xlsx文件路径
        """
        try:
            self.wb = openpyxl.load_workbook(xlsx_file_path)
        except FileNotFoundError as ffe:
            # print('打开文件失败')
            handler_logger.error(ffe)
            raise
        # 不确定打开的是哪个表单
        self.sh = None

    def close_workbook(self):
        """
        关闭当前打开的工作簿
        :return:
        """
        self.wb.close()

    def select_sheet_by_name(self, sheet_name: str):
        """
        根据传入的工作表的名字,打开工作表。
        :param sheet_name: 作表的名字
        """
        self.sh = self.wb[f'{sheet_name}']

    def read_all_rows_data(self):
        """
        从选定的表单当中,第一行作为key.
        将后面的每一行数据,与第一行拼接成一个字典数据,作为一条测试用例数据。
        将所有测试用例数据,添加到一个列表当中。
       :return: 测试用例数据列表
        """
        # 获取表单的所有行,即获取表单的所有数据
        sheet_all_rows = list(self.sh.values)
        # 把第一行作为数据的keys
        keys = sheet_all_rows[0]
        # print(keys)

        # 定义 cases_list 存放测试用例
        cases_list = []

        # 以下代码功能:excel表单第2行开始的每一行测试数据,与第一行的keys拼接成一个字典。
        for single_row in sheet_all_rows[1:]:
            case_dict = dict(zip(keys, single_row))
            cases_list.append(case_dict)
        return cases_list


if __name__ == '__main__':
    eh = HandlerExcel(r'D:SkyWorkSpaceWorkSpaceAPI_testlmFuture_Loan'
                      r'Future_Loan_day16	est_datas	est_register_cases.xlsx')
    eh.select_sheet_by_name('register_case')
    print(eh.read_all_rows_data())
    eh.close_workbook()

 

h、handler_logging.py 代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com


import logging
# 自定义一个日志模块
import os
# 导入 ConfigParser 类 ,用来操作 config.ini 文件 ;
import time
from configparser import ConfigParser
from logging import Logger
from logging import handlers


class HandlerLogger(Logger):

    def __init__(self):

        # 一、用 ConfigParser类 来操作 config.ini 配置文件 ;
        # 实例化一个 ConfigParser ;
        conf = ConfigParser()

        # 1、获取config.ini文件
        common_dir = os.path.dirname(os.path.realpath(__file__))
        base_dir = os.path.dirname(common_dir)
        config_ini_file = os.path.join(base_dir, 'conf', 'config.ini')

        # 2、从配置文件获取值
        conf.read(config_ini_file, encoding='utf-8')
        logger_name = conf.get('log', 'name')
        level = conf.get('log', 'level')
        file_name = conf.get('log', 'file_name')
        show_stream_handler = conf.get('log', 'show_stream_handler')
        when = conf.get('log', 'when')

        # 二、设置自定义日志搜集器名字、设置日志级别;
        super().__init__(logger_name, level)

        # 三、定义日志输出格式, 使用Formatter类实例化一个日志格式类;
        fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s, %(pathname)s,line=%(lineno)d'
        # fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s,line=%(lineno)d'
        formatter = logging.Formatter(fmt)

        # 四A、日志默认输出到控制台,如果设置为False,日志将不输出到控制台;
        if show_stream_handler == 'True':
            stream_handler = logging.StreamHandler()
            # 设置渠道当中的日志格式
            stream_handler.setFormatter(formatter)
            # 将渠道与实例日志搜集器绑定
            self.addHandler(stream_handler)

        # 四B、把日志输出到文件file
        # 首先拼接存放log的file文件
        logs_file = os.path.join(base_dir, 'logs', file_name)
        print(logs_file)
        if logs_file:
            file_handle = handlers.TimedRotatingFileHandler(filename=logs_file,
                                                            when=when,
                                                            encoding='utf-8',
                                                            interval=1,
                                                            backupCount=5)

            # 设置渠道当中的日志格式
            file_handle.setFormatter(formatter)
            # 将渠道与实例日志搜集器绑定
            self.addHandler(file_handle)


# 生成一个 handler_logger 实例,在其他所有模块中导入该模块时,共用这一个日志搜集实例。handler_logger 类似于 全局变量
# 日志搜集是典型的单列设计模式 (单实例模式) 。
handler_logger = HandlerLogger()

if __name__ == '__main__':
    handler_logger = HandlerLogger()
    for i in range(10):
        time.sleep(1)
        handler_logger.debug('=====debug=====')
        handler_logger.info('=====info=====')
        handler_logger.warning('=====warning=====')
        handler_logger.error('=====error=====')

 

i、handler_mysql.py 代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 13:18
# Project: Future_Loan_day15
# Module:  handler_mysql.py

import pymysql

from conf.mysql_ini import MYSQL_INI


# 封装操作Mysql数据库类
class HandleMysql:
    def __init__(self):
        """
        1、初始化建立连接,创建数据库连接
        charset='utf8', 注意,不是utf-8 哦
        返回数据格式控制: cursorclass=pymysql.cursors.DictCursor  加上这个表示返回字典格式的数据;不加的话,以元组的形式返回;
        """
        self.connection = pymysql.connect(
            host=MYSQL_INI['host'],
            port=MYSQL_INI['port'],
            user=MYSQL_INI['user'],
            password=MYSQL_INI['password'],
            database=MYSQL_INI['database'],
            charset=MYSQL_INI['charset'],
            cursorclass=pymysql.cursors.DictCursor)
        # 2、创建游标
        self.cur = self.connection.cursor()

    # 获取 查询得到的行数(数量)
    def get_count(self, sql):
        count = self.cur.execute(sql)
        return count

    # 获取一条数据,一般都是最前面的那条数据
    def get_db_one_data(self, sql):
        self.cur.execute(sql)
        return self.cur.fetchone()

    # 获取全部数据
    def get_db_all_data(self, sql):
        self.cur.execute(sql)
        return self.cur.fetchall()

    # 关闭数据库连接
    def close(self):
        self.cur.close()
        self.connection.close()


# 使用单例模式,后续导入数据库就只导入 handler_mysql
handler_mysql = HandleMysql()


if __name__ == '__main__':

    # 1、建立链接
    handler_mysql = HandleMysql()

    # 2、执行 sql 语句
    # 先在 sql客户端 (Navicat Premium 12免安装) 执行以下sql语句,查询结果
    phone = 18837906872
    sql_str_2 = f"select * from member where mobile_phone='{phone}'"    # 方式二
    # 执行sql语句
    results = handler_mysql.get_db_one_data(sql_str_2)
    print(results)

 

j、handler_request.py  代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
import requests
from tools.handler_logging import handler_logger


class HandlerRequests:

    def __init__(self):
        """
        初始化方法,初始化请求头;
        """
        self.headers = {"X-Lemonban-Media-Type": "lemonban.v2"}
        handler_logger.info(f'请求头为:{self.headers}')

    # 方法 post/put...  json=xxx, get方法用 params=xxx
    def send_reuqests(self, method, url, req_data, token=None):
        """
        调用requests库里面的方法去发起请求,并得到响应结果;
        :param url: 接口url
        :param method: 请求方法,get,psot
        :param req_data: 请求数据
        :param token: 如果有token,添加token
        """
        handler_logger.info(f'请求方法为:{method}')
        handler_logger.info(f'请求url为:{url}')
        handler_logger.info(f'请求数据为:{req_data}')

        # 如果有token,添加token
        self.__del_header(token)

        # 注意 request() 方法是不带s的,requests库是带s的;
        if method.upper() == "GET":
            resp = requests.request(method, url, params=req_data, headers=self.headers)
            return resp
        if method.upper() == "POST":
            # 为了便于学习,简单的认为就是用 json 格式传;
            resp = requests.request(method, url, json=req_data, headers=self.headers)
            return resp

    def __del_header(self, token=None):
        """
        如果有token,添加token处理
        :param token: 如果有token,添加token处理
        """
        if token:
            self.headers["Authorization"] = f"Bearer {token}"


if __name__ == '__main__':
    handler_requests = HandlerRequests()

 

k、操作mysql.py  代码如下:

# -*- coding:utf-8 -*-
# Author:  Sky
# Email:   2780619724@qq.com
# Time:    2021/9/4 10:02
# Project: Future_Loan_day15
# Module:  操作mysql.py


import pymysql

# 1、建立连接,创建数据库连接
# charset='utf8', 注意,不是utf-8 哦
# 返回数据格式控制: cursorclass=pymysql.cursors.DictCursor  加上这个表示返回字典格式的数据;不加的话,以元组的形式返回;
connection = pymysql.connect(host='api.lemonban.com',
                             port=3306,
                             user='future',
                             password='123456',
                             database='futureloan',
                             charset='utf8',
                             cursorclass=pymysql.cursors.DictCursor)

# 2、创建游标
cur = connection.cursor()

# 3、执行sql语句 , 返回数据
sql_str = "select * from member where reg_name='娜娜'"   # 方式一

# phone = 18837906872
# sql_str_2 = f"select * from member where mobile_phone='{phone}'"    # 方式二

affected_rows = cur.execute(sql_str)    # 返回影响的行数
print(f'返回影响的行数:{affected_rows}')

# 4、获取查询的结果
first_data = cur.fetchone()  # 获取一条数据,第一条数据;字典形式返回 ;
all_data = cur.fetchall()  # 获取全部数据;列表形式返回 ;
two_data = cur.fetchmany(size=2)   # 获取前2行数据;列表形式返回 ;
print(f'获取第一条数据:{first_data}')
print(f'获取全部数据:{all_data}')
print(f'获取前2行数据:{two_data}')

# 5、关闭数据库连接
cur.close()     # 首先关闭游标
connection.close()      # 关闭数据库

运行方式如下:

 

执行结果如下:

 

本文摘自 :https://www.cnblogs.com/

开通会员,享受整站包年服务立即开通 >