django+selenium 12306接口车票查询

发布时间:2019-05-12 22:16:31编辑:Run阅读(6786)

    WIN环境下

    selenium环境安装

    pip3 install selenium 

    还需要下载一个谷歌浏览器对应的chromedriver,下载地址:https://npm.taobao.org/mirrors/chromedriver/

    放在python安装的对应目录即可,如下:

    image.png

    至于linux,mac环境可以参考其他的文章


    12306页面分析:

    在查询票这里,可以看到url的变化,是以拼接的方式构成的完整的url

    kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs=上海,SHH&ts=成都,CDW&date=2019-05-15&flag=N,N,Y

    上海,SHH  SHH为城市编号

    成都,CDW CDW为城市编号

    date=2019-05-15 为出发的日期

    如下图所标:

    image.png


    只要找到各个城市对应编号,构造请求的url,就可以实现查询。

    城市编码需要到首页获取:https://www.12306.cn/index/

    image.png


    我这里只是获取了热门城市的编号,之前用了requests去请求没有数据,这部分是基于JS动态加载的,那么还是上selenium把,无奈....

    完整代码如下:

    from selenium import webdriver
    import time
    import json
    
    
    class QueryTicket:
        def main(self):
            url = 'https://www.12306.cn/index/index.html'
            options = webdriver.ChromeOptions()
            options.add_argument('--disable-infobars')
            options.add_argument('--start-maximized')
            options.add_argument('--headless')
            browser = webdriver.Chrome(chrome_options=options)
            city_number_dict = {}
            try:
                browser.get(url)
                browser.implicitly_wait(20)
                time.sleep(3)
                # 找到热门城市的标签
                elements = browser.find_elements_by_xpath("//ul[@class='popcitylist']/li")
                for i in elements:
                    city_name = i.get_attribute('title')
                    city_number = i.get_attribute('data')
                    city_number_dict.setdefault(city_name, city_number)
                print(city_number_dict)
                with open("city_number.txt", encoding='utf-8', mode='w') as f:
                    f.write(json.dumps(city_number_dict, ensure_ascii=False))
    
            except Exception as e:
                print(e)
            finally:
                browser.quit()
    
    
    if __name__ == '__main__':
        st = QueryTicket()
        st.main()

    运行结果:

    {'北京': 'BJP', '上海': 'SHH', '天津': 'TJP', '重庆': 'CQW', '长沙': 'CSQ', '长春': 'CCT', '成都': 'CDW', '福州': 'FZS', '广州': 'GZQ', '贵阳': 'GIW', '呼和浩特': 'HHC', '哈尔滨': 'HBB', '合肥': 'HFH', '杭州': 'HZH', '海口': 'VUQ', '济南': 'JNK', '昆明': 'KMM', '拉萨': 'LSO', '兰州': 'LZJ', '南宁': 'NNZ', '南京': 'NJH', '南昌': 'NCG', '沈阳': 'SYT', '石家庄': 'SJP', '太原': 'TYV', '乌鲁木齐南': 'WMR', '武汉': 'WHN', '西宁': 'XNO', '西安': 'XAY', '银川': 'YIJ', '郑州': 'ZZF', '深圳': 'SZQ', '厦门': 'XMS'}



    拿到城市,以及城市对应的编号,就可以构造请求,获取车票信息了.

    只需要把城市,城市编号,出发日 拼接成一个完整的url即可,如下:

    https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs=上海,SHH&ts=成都,CDW&date=2019-05-15&flag=N,N,Y

    这里我又尝试用requests请求,还是没有数据,老老实实用selenium把

    引用了一个资源文件,里面放的是user-agent

    resource.py内容如下:

    #!/usr/bin/env python
    # coding: utf-8
    UserAgents = [
        "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0",
        "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
        "Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;",
        "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
        "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
        "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Avant Browser)",
    ]


    完整代码如下:

    #!/usr/bin/env python
    # coding: utf-8
    
    from selenium import webdriver
    from selenium.webdriver.support.wait import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.by import By
    import json
    import random
    import resource
    import logging
    
    
    class QueryTicket:
        @classmethod
        def getRandomHeaders(self):
            # 随机选取User-Agent头
            return random.choice(resource.UserAgents)
    
        def main(self, start_city, end_city, start_time):
            # 构建返回数据JSON
            result = {
                "describe": None,
                "data": None,
            }
            logging.captureWarnings(True)
            options = webdriver.ChromeOptions()
            options.add_argument('--disable-infobars')
            options.add_argument('--start-maximized')
            options.add_argument('user-agent={}'.format(self.getRandomHeaders))
            options.add_argument('--headless')
            browser = webdriver.Chrome(chrome_options=options)
            try:
                all_train_number_list = []
                url = self.splicing_url(start_city, end_city, start_time)
                browser.get(url)
                browser.implicitly_wait(10)
                button = (By.XPATH, "//tbody[@id='queryLeftTable']/tr")
                WebDriverWait(browser, 20, 0.5).until(EC.presence_of_element_located(button))
                # 获取总信息
                train_number_describe = browser.find_element_by_xpath("//div[@id='sear-result']/p").text
                result["describe"] = train_number_describe
                element = browser.find_element_by_xpath("//tbody[@id='queryLeftTable']")
                # 找到所有车次
                lists = element.find_elements_by_xpath("./tr[@class='bgc'] | tr[@class='']")
                for i in lists:
                    all_train_number_dict = {}
                    # 车次
                    train_number = i.find_element_by_xpath(".//a[@class='number']").text
                    # 出发站
                    departure_station = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[0].text
                    # 到达站
                    destination = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[1].text
                    # 出发时间
                    departure_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[0].text
                    # 到达时间
                    arrival_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[1].text
                    # 历时 ,总时间
                    duration_total_time = i.find_element_by_xpath(".//div[@class='ls']/strong").text
                    # 历时,是否当日到达
                    duration_describe = i.find_element_by_xpath(".//div[@class='ls']/span").text
                    # 商务座,特等座
                    business_seat = i.find_elements_by_xpath(".//td")[1].text
                    # 一等座
                    first_class_seat = i.find_elements_by_xpath(".//td")[2].text
                    # 二等座
                    two_class_seat = i.find_elements_by_xpath(".//td")[3].text
                    # 高级软卧
                    high_grade_soft_berth = i.find_elements_by_xpath(".//td")[4].text
                    # 软卧一等卧
                    first_class_sleeping = i.find_elements_by_xpath(".//td")[5].text
                    # 动卧
                    moving_position = i.find_elements_by_xpath(".//td")[6].text
                    # 硬卧二等卧
                    two_class_sleeping = i.find_elements_by_xpath(".//td")[7].text
                    # 软座
                    soft_seats = i.find_elements_by_xpath(".//td")[8].text
                    # 硬座
                    hard_seat = i.find_elements_by_xpath(".//td")[9].text
                    # 无座
                    no_seat = i.find_elements_by_xpath(".//td")[10].text
                    # 其它
                    other = i.find_elements_by_xpath(".//td")[11].text
    
                    all_train_number_dict.setdefault("车次", train_number)
                    all_train_number_dict.setdefault("出发站", departure_station)
                    all_train_number_dict.setdefault("到达站", destination)
                    all_train_number_dict.setdefault("出发时间", departure_time)
                    all_train_number_dict.setdefault("到达时间", arrival_time)
                    all_train_number_dict.setdefault("历时总时间", duration_total_time)
                    all_train_number_dict.setdefault("历时是否当日到达", duration_describe)
                    all_train_number_dict.setdefault("商务座特等座", business_seat)
                    all_train_number_dict.setdefault("一等座", first_class_seat)
                    all_train_number_dict.setdefault("二等座", two_class_seat)
                    all_train_number_dict.setdefault("高级软卧", high_grade_soft_berth)
                    all_train_number_dict.setdefault("软卧一等卧", first_class_sleeping)
                    all_train_number_dict.setdefault("动卧", moving_position)
                    all_train_number_dict.setdefault("硬卧二等卧", two_class_sleeping)
                    all_train_number_dict.setdefault("软座", soft_seats)
                    all_train_number_dict.setdefault("硬座", hard_seat)
                    all_train_number_dict.setdefault("无座", no_seat)
                    all_train_number_dict.setdefault("其它", other)
                    all_train_number_list.append(all_train_number_dict)
                result['data'] = all_train_number_list
                return result
            except Exception as e:
                print(e)
                return result
            finally:
                browser.quit()
    
        def splicing_url(self, start_city, end_city, start_time):
            with open("city_number.txt", encoding='utf-8', mode='r') as f:
                city_number_dict = json.loads(f.read())
                if start_city and end_city in city_number_dict:
                    url = 'https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs={},{}&ts={},{}&date={}&flag=N,N,Y'.\
                        format(start_city, city_number_dict.get(start_city),
                               end_city, city_number_dict.get(end_city),
                               start_time)
                    return url
                else:
                    return False
    
    
    if __name__ == '__main__':
        st = QueryTicket()
        ret = st.main('上海', '武汉', '2019-05-28')
        print(ret)

    运行结果如下:

    image.png


    最后整合代码,使用django rest framework,实现一个基于post请求的查询接口

    版本信息:

    Django==2.2.1

    djangorestframework==3.9.4


    django  settings.py配置,主要是三个地方,第三个可选

    1 在 INSTALLED_APPS下配置rest_framework,如下:

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'app01.apps.App01Config',
        'rest_framework',
    ]

    2 在最后添加上rest_framework的配置参数

    REST_FRAMEWORK = {
        'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.URLPathVersioning",
        'DEFAULT_VERSION': 'v1',
        'ALLOWED_VERSIONS': ['v1', 'v2'],
        'VERSION_PARAM': 'version',
        'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
        'PAGE_SIZE': 1,  # 默认分页大小
        "DEFAULT_THROTTLE_CLASSES": ["app01.utils.throttle.MyThrottle", ],
        "DEFAULT_THROTTLE_RATES": {
            "rate": "1/s",
        },
        'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer', ),
    }

    3 允许所有ip可访问,用于内网测试

    ALLOWED_HOSTS = ['*']


    django路由配置urls.py

    from django.contrib import admin
    from django.urls import path
    from django.conf.urls import url
    from app01.views import RestApiView
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        url(r'^api?', RestApiView.as_view(), name='api'),
    ]


    django视图配置views.py

    from django.shortcuts import render
    from rest_framework.versioning import QueryParameterVersioning
    from rest_framework.views import APIView
    from rest_framework.response import Response
    from django.http import JsonResponse
    from app01.selenium import query_tick_12306
    import json, time, os
    # Create your views here.
    
    
    class RestApiView(APIView):
        versioning_class = QueryParameterVersioning
    
        def dispatch(self, request, *args, **kwargs):
            """
            请求到来之后,都要执行dispatch方法,dispatch方法根据请求方式不同触发 get/post/put等方法
            """
            return super().dispatch(request, *args, **kwargs)
    
        def get(self, request, *args, **kwargs):
            return JsonResponse({"status": 200})
    
        def post(self, request, *args, **kwargs):
            BASE_DIR = os.path.dirname(os.path.abspath(__file__))
            file_path = os.path.join(BASE_DIR, 'selenium', 'city_number.txt')
            start_city = request.data.get('start_city').strip()
            end_city = request.data.get('end_city').strip()
            start_time = request.data.get('start_time').strip()
            now_time = self.now_time()
            # 判断时间是否过期
            if now_time > start_time:
                return JsonResponse({"status": "error", "message": "过期时间"})
    
            with open(file_path, encoding='utf-8', mode='r') as f:
                city_number_dict = json.loads(f.read())
                # print(city_number_dict)
                # 判断城市是否在对应字典里面
                if start_city in city_number_dict and end_city in city_number_dict:
                    url = 'https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs={},{}&ts={},{}&date={}&flag=N,N,Y'.\
                        format(start_city, city_number_dict.get(start_city),
                               end_city, city_number_dict.get(end_city),
                               start_time)
                    st = query_tick_12306.QueryTicket()
                    ret = st.main(url)
                    return JsonResponse(ret)
                else:
                    return JsonResponse({"status": "error", "message": "城市错误"})
    
        def now_time(self):
            return time.strftime('%Y-%m-%d', time.localtime(time.time()))


    创建两个自定义文件夹selenium,utils,完整目录结构如下:

    image.png


    selenium对应爬虫的两个文件,需手动运行获取城市编号的python脚本,另一个是获取车票信息的

    get_12306_city.py  获取城市,以及城市编号,需提前手动运行下,会自动生成一个city_number.txt文件

    from selenium import webdriver
    import time
    import json
    
    
    class QueryTicket:
        def main(self):
            url = 'https://www.12306.cn/index/index.html'
            options = webdriver.ChromeOptions()
            options.add_argument('--disable-infobars')
            options.add_argument('--start-maximized')
            options.add_argument('--headless')
            browser = webdriver.Chrome(chrome_options=options)
            city_number_dict = {}
            try:
                browser.get(url)
                browser.implicitly_wait(20)
                time.sleep(3)
                # 找到热门城市的标签
                elements = browser.find_elements_by_xpath("//ul[@class='popcitylist']/li")
                for i in elements:
                    city_name = i.get_attribute('title')
                    city_number = i.get_attribute('data')
                    city_number_dict.setdefault(city_name, city_number)
                print(city_number_dict)
                with open("city_number.txt", encoding='utf-8', mode='w') as f:
                    f.write(json.dumps(city_number_dict, ensure_ascii=False))
    
            except Exception as e:
                print(e)
            finally:
                browser.quit()
    
    
    if __name__ == '__main__':
        st = QueryTicket()
        st.main()


    query_tick_12306.py 获取车票信息

    #!/usr/bin/env python
    # coding: utf-8
    
    from selenium import webdriver
    from selenium.webdriver.support.wait import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.by import By
    import random
    from app01.selenium import resource
    import logging
    
    
    class QueryTicket:
        @classmethod
        def getRandomHeaders(self):
            # 随机选取User-Agent头
            return random.choice(resource.UserAgents)
    
        def main(self, url):
            # 构建返回数据JSON
            result = {
                "describe": None,
                "data": None,
            }
            logging.captureWarnings(True)
            options = webdriver.ChromeOptions()
            options.add_argument('--disable-infobars')
            options.add_argument('--start-maximized')
            options.add_argument('user-agent={}'.format(self.getRandomHeaders))
            options.add_argument('--headless')
            browser = webdriver.Chrome(chrome_options=options)
            try:
                all_train_number_list = []
                # url = self.splicing_url(start_city, end_city, start_time)
                browser.get(url)
                browser.implicitly_wait(10)
                button = (By.XPATH, "//tbody[@id='queryLeftTable']/tr")
                WebDriverWait(browser, 20, 0.5).until(EC.presence_of_element_located(button))
                # 获取总信息
                train_number_describe = browser.find_element_by_xpath("//div[@id='sear-result']/p").text
                result["describe"] = train_number_describe
                element = browser.find_element_by_xpath("//tbody[@id='queryLeftTable']")
                # 找到所有车次
                lists = element.find_elements_by_xpath("./tr[@class='bgc'] | tr[@class='']")
                for i in lists:
                    all_train_number_dict = {}
                    # 车次
                    train_number = i.find_element_by_xpath(".//a[@class='number']").text
                    # 出发站
                    departure_station = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[0].text
                    # 到达站
                    destination = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[1].text
                    # 出发时间
                    departure_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[0].text
                    # 到达时间
                    arrival_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[1].text
                    # 历时 ,总时间
                    duration_total_time = i.find_element_by_xpath(".//div[@class='ls']/strong").text
                    # 历时,是否当日到达
                    duration_describe = i.find_element_by_xpath(".//div[@class='ls']/span").text
                    # 商务座,特等座
                    business_seat = i.find_elements_by_xpath(".//td")[1].text
                    # 一等座
                    first_class_seat = i.find_elements_by_xpath(".//td")[2].text
                    # 二等座
                    two_class_seat = i.find_elements_by_xpath(".//td")[3].text
                    # 高级软卧
                    high_grade_soft_berth = i.find_elements_by_xpath(".//td")[4].text
                    # 软卧一等卧
                    first_class_sleeping = i.find_elements_by_xpath(".//td")[5].text
                    # 动卧
                    moving_position = i.find_elements_by_xpath(".//td")[6].text
                    # 硬卧二等卧
                    two_class_sleeping = i.find_elements_by_xpath(".//td")[7].text
                    # 软座
                    soft_seats = i.find_elements_by_xpath(".//td")[8].text
                    # 硬座
                    hard_seat = i.find_elements_by_xpath(".//td")[9].text
                    # 无座
                    no_seat = i.find_elements_by_xpath(".//td")[10].text
                    # 其它
                    other = i.find_elements_by_xpath(".//td")[11].text
    
                    all_train_number_dict.setdefault("车次", train_number)
                    all_train_number_dict.setdefault("出发站", departure_station)
                    all_train_number_dict.setdefault("到达站", destination)
                    all_train_number_dict.setdefault("出发时间", departure_time)
                    all_train_number_dict.setdefault("到达时间", arrival_time)
                    all_train_number_dict.setdefault("历时总时间", duration_total_time)
                    all_train_number_dict.setdefault("历时是否当日到达", duration_describe)
                    all_train_number_dict.setdefault("商务座特等座", business_seat)
                    all_train_number_dict.setdefault("一等座", first_class_seat)
                    all_train_number_dict.setdefault("二等座", two_class_seat)
                    all_train_number_dict.setdefault("高级软卧", high_grade_soft_berth)
                    all_train_number_dict.setdefault("软卧一等卧", first_class_sleeping)
                    all_train_number_dict.setdefault("动卧", moving_position)
                    all_train_number_dict.setdefault("硬卧二等卧", two_class_sleeping)
                    all_train_number_dict.setdefault("软座", soft_seats)
                    all_train_number_dict.setdefault("硬座", hard_seat)
                    all_train_number_dict.setdefault("无座", no_seat)
                    all_train_number_dict.setdefault("其它", other)
                    all_train_number_list.append(all_train_number_dict)
                result['data'] = all_train_number_list
                return result
            except Exception as e:
                print(e)
                return result
            finally:
                browser.quit()
    
    
    if __name__ == '__main__':
        pass


    utils里面有一个文件,接口访问限制

    throttle.py

    #!/usr/bin/env python
    # coding: utf-8
    """
    自定义的访问限制类
    """
    from rest_framework.throttling import SimpleRateThrottle
    
    class MyThrottle(SimpleRateThrottle):
    
        scope = "rate"  # rate是名字,可以随便定义!
    
        def get_cache_key(self, request, view):
            return self.get_ident(request)


    所有配置好后,运行django

    image.png


    用postman测试,没有自行下载..

    先测试过期的时间,

    image.png


    再测试下错误的城市,因为只获取了热门城市

    image.png


    最后在测试下,正常的请求

    image.png


    后期还会做web可视化,以及登陆抢票接口,未完待续.......

    完整代码github地址:https://github.com/py3study/12306_ticket_inquiry

    python3交流群:198447500


关键字