beibei_sum_spark(pyt

发布时间:2019-07-15 10:48:02编辑:auto阅读(1263)

    # -*-coding:utf8-*-#
    __author__ = 'hash'
    """
    create time:16/7/5 15:42
    """
    from datetime import datetime, timedelta
    # os.environ['SPARK_HOME'] = "/Users/play/software/spark"  # 绝对路径
    # sys.path.append("/Users/play/software/spark/python")
    # print os.environ['SPARK_HOME']
    from pyspark import SparkContext, SparkConf
    # Initialize SparkContext
    # sc = SparkContext('local')
    conf = SparkConf().setAppName("The GMV Sum BeiBei")
    conf.set('spark.logConf', False)
    sc = SparkContext(conf=conf)
    today_str='2016-07-05'
    today = datetime.strptime(today_str, "%Y-%m-%d")
    # today = datetime.today()
    today_str = today.strftime("%Y-%m-%d")
    # 指定日期,前一天
    ytday = today - timedelta(days=1)
    ytday_str = ytday.strftime("%Y-%m-%d")
    base_path = "hdfs://master:9000/super_db/"
    category = "beibei-com"
    # source_path_current='/Users/play/TEMP/7-4/log/2016-07-03/beibei-com_2016-07-03*'
    # source_path_current = '/Users/play/TEMP/7-4/log/test/test.txt'
    # source_path_next = '/Users/play/TEMP/7-4/log/2016-07-04/beibei-com_2016-07-04-00*'
    # yesterday
    # source_path_current = '/super_db/raw_db/' + category + '/' + category + '_item/2016/' + ytday_str + '/' + category + '_' + ytday_str + '*'
    # today
    # source_path_next = '/super_db/raw_db/' + category + '/' + category + '_item/2016/' + today_str + '/' + category + '_' + today_str + '-00*'
    #
    source_path_current = '/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_04.txt'
    source_path_next = '/super_db/raw_db/beibei/beibei_item/2016/beibei_item_07_05.txt'
    # target_data_path = '/Users/play/TEMP/7-4/data/rs.txt'
    # target_stat_path = '/Users/play/TEMP/7-4/stat/rs.txt'
    # target_cat_stat_path = '/Users/play/TEMP/7-4/stat_by_cat/rs.txt'
    target_stat_path = base_path + "analytics_db/" + category + "/" + category + "_item/2016/" + ytday_str + "_stat2"
    print target_stat_path
    # def filter_log_item(x):
    #     sp1 = x.split("#&#")
    #     if len(sp1) == 10:
    #         try:
    #             x1 = datetime.strptime(sp1[9], '%Y-%m-%d %H:%M:%S')
    #             return True
    #         except:
    #             return False
    #             # return True
    #     else:
    #         return False
    def filter_log_item2(x):
        sp1 = x.split("#&#")
        if len(sp1) == 28:
            try:
                x1 = datetime.strptime(sp1[-1], '%Y-%m-%d %H:%M:%S')
                return True
            except:
                return False
                # return True
        else:
            return False
    def get_some_data(x):
        sp1 = x.split("#&#")
        # if len(sp1)<10:return None
        id = sp1[0].strip()
        cat = sp1[2].strip()
        brand = sp1[3].strip()
        price = float(sp1[4])
        orig_price = float(sp1[5])
        sales = int(sp1[6])
        start_date = sp1[8].split(":")[0].strip()
        start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
        price= 0 if price==-1 else price
        orig_price= 0 if orig_price==-1 else orig_price
        sales= 0 if sales==-1 else sales
        return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price))
    def get_some_data2(x):
        sp1 = x.split("#&#")
        # if len(sp1)<10:return None
        # id = sp1[0].strip()
        # cat = sp1[2].strip()
        # brand = sp1[3].strip()
        # price = float(sp1[4])
        # orig_price = float(sp1[5])
        # # sales = int(sp1[6])
        # start_date = sp1[8].split(":")[0].strip()
        # start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
        id = sp1[3]
        start_date = sp1[27].split(":")[0].strip()
        start_date1 = datetime.strptime(start_date, "%Y-%m-%d %H")
        sales = sp1[8]
        price = sp1[6]
        orig_price = sp1[9]
        # stock = int(sp1[17])
        if "\N" == sales:sales = 0
        else:sales = int(sales)
        if "\N" == price:
            price = 0.0
        else:
            price = float(price)
        if "\N" == orig_price:
            orig_price = 0.0
        else:
            orig_price = float(orig_price)
        # return (id, (start_date, sales, stock, price))
        cat=""
        brand=""
        return ((id, start_date1), (start_date1, sales, price, cat, brand, orig_price))
    # 昨天销售数据
    yesterday_sales_rdd = sc.textFile(source_path_current).filter(filter_log_item2).map(get_some_data2)
    # ys1 = yesterday_sales_rdd.collect()
    # print ys1
    # print get_some_data(ys1)
    # 今天0点的数据
    today_sales_rdd = sc.textFile(source_path_next).filter(filter_log_item2).map(get_some_data2)
    # print today_sales_rdd.collect()
    # 每小时的商品数,只要有日志记录
    gbk = yesterday_sales_rdd.groupByKey()
    # from collections import Counter
    def couter(x, y):
        k1 = x.keys()
        k2 = y.keys()
        s1 = set(k1 + k2)
        d1 = dict()
        for h in s1:
            d1[h] = x.get(h, 0) + y.get(h, 0)
        return d1
    # hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: dict(Counter(x) + Counter(y)))
    hour_sale_num = gbk.keys().map(lambda dd: {dd[1].hour: 1}).reduce(lambda x, y: couter(x, y))
    print hour_sale_num
    # 在售商品个数
    spu_sum = yesterday_sales_rdd.count()
    #
    # s = '2016-07-04 00:15:02'
    # datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
    # 销售汇总,昨天和今天
    all_data = yesterday_sales_rdd.union(today_sales_rdd)
    # print 'all_data.first()',all_data.collect()
    sales_rdd = all_data.map(lambda x: (x[0][0], x[1]))
    print sales_rdd.first()
    grouped_sales_rdd = sales_rdd.groupByKey().mapValues(list)
    print grouped_sales_rdd.first()
    # start_date, sales, price, cat, brand, orig_price
    def date_dict(lst):
        d2 = dict()
        for i, x in enumerate(lst):
            #     print x
            #     print x[0].hour
            d2[x[0]] = x[1:]
        return d2
    # date_dict(sale_1[1])
    def sale_count24(dd):
        sale_list = date_dict(dd)
        #         print sale_list
        sort = sorted(sale_list)
        sales = []
        for x in reversed(range(24)):
            #     print x
            td1 = ytday + timedelta(hours=x)
            td2 = ytday + timedelta(hours=x - 1)  # 前1小时
            try:
                indx = sort.index(td1)
                td2 = sort[indx - 1]  # 列表的前一项
                s1 = int(sale_list[td1][0])
                p1 = float(sale_list[td1][1])
                s2 = int(sale_list[td2][0])
                p2 = float(sale_list[td2][1])
                sale = s1 - s2
                sale = sale if sale > 0 else 0
                money = s1 * p1 - s2 * p2
                money = money if money > 0 else 0
                sales.append((sale, money))
                continue
            except:
                # 不在列表内
                sale = 0
                money = 0.0
                sales.append((sale, money))
                continue
        return list(reversed(sales))  # 要倒置一下
    # gsr=grouped_sales_rdd.collect()
    # example1=gsr[1][1]
    # for x in example1:
    #     print x
    # print sale_count24(example1)
    hour_sales_rdd = grouped_sales_rdd.map(lambda (a, b): (a, sale_count24(b)))
    def sum2(a, b):
        if len(a) == 2:
            a = a[1]
        if len(b) == 2:
            b = b[1]
        su = []
        for x, y in zip(a, b):
            su.append((x[0] + y[0], x[1] + y[1]))
        return su
    hour_sales = hour_sales_rdd.reduce(lambda a, b: sum2(a, b))
    # print hour_sales
    # 每小时,累积销量,累积销售额
    num = [x[0] for x in hour_sales]
    money = [x[1] for x in hour_sales]
    sale_num_sum = sum(num)
    # sum(money) / float(len(money))
    sale_money_sum = sum(money)
    # SPU 每小时销售额>0的商品个数
    spu_sale = hour_sales_rdd.map(lambda a: [1 if x[0] > 0 else 0 for x in a]).reduce(
        lambda a, b: [x + y for x, y in zip(a, b)])
    print 'spu_sale', spu_sale, len(spu_sale)
    # 平均价格
    # yesterday_sales_rdd.first()
    # 取出现价
    now_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][2])
    # now_price_rdd.first()
    # 取出原价
    org_price_rdd = yesterday_sales_rdd.map(lambda x: x[1][-1])
    # org_price_rdd.first()
    avg_price = now_price_rdd.reduce(lambda a, b: (a + b) / 2.0)
    avg_org_price = org_price_rdd.reduce(lambda a, b: (a + b) / 2.0)
    # 折扣
    discount = avg_price / avg_org_price
    # discount
    # SPU 每小时销售额>0的商品个数
    def sale_item_to_hour(x):
        idd = x[0]
        salist = x[1]
        hd = dict()
        for h in range(24):
            if salist[h][0] > 0:  # 有销量
                hd[h] = [idd]
        return hd
    def reduce_hour_dict_sum(x, y):
        #     print Counter(x),Counter(y)
        nd = dict()
        for h in range(24):
            nd[h] = x.get(h, []) + y.get(h, [])
        return nd
    # reduce_hour_dict_sum(ssc[6],ssc[10])
    spu_sale = hour_sales_rdd.map(sale_item_to_hour).reduce(lambda x, y: reduce_hour_dict_sum(x, y))
    ##每小时销售商品的个数
    spu_sale_num = dict()
    for h, lst in spu_sale.items():
        spu_sale_num[h] = len(lst)
    set1 = reduce(set.union, [set(x) for x in spu_sale.values()])
    spu_sale_sum = len(set1)  # 商品去重后的总数
    # json
    js = dict()
    for hour in range(24):
        sale_num = hour_sales[hour][0]
        sale_money = hour_sales[hour][1]
        spu = hour_sale_num.get(hour, 0)
        spu_sale_hour = spu_sale_num.get(hour, 0)
        js[hour] = {'sales': sale_num, 'gmv': sale_money, 'spu': spu, 'spu_saled': spu_sale_hour}
    # spu_sale_sum ,set 去重
    js['sum'] = {'sales_sum': sale_num_sum, 'gmv_sum': sale_money_sum,
                 'spu_sum': spu_sum, 'spu_saled_sum': spu_sale_sum,
                 'avg_price': avg_price, 'avg_org_price': avg_org_price, 'discount': discount
                 }
    print '----------------------------'
    print js
    """
    js={0: {'spu': 3, 'spu_saled': 2, 'sales': 4394, 'gmv': 1739302.0}, 1: {'spu': 3, 'spu_saled': 2, 'sales': 8048, 'gmv': 716863.0}, 2: {'spu': 3, 'spu_saled': 3, 'sales': 3, 'gmv': 798.0}, 3: {'spu': 3, 'spu_saled': 2, 'sales': -8046, 'gmv': -715563.0}, 4: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 5: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 6: {'spu': 3, 'spu_saled': 3, 'sales': 8057, 'gmv': 718666.0}, 7: {'spu': 3, 'spu_saled': 2, 'sales': -2480, 'gmv': -1687642.0}, 8: {'spu': 3, 'spu_saled': 2, 'sales': 560, 'gmv': 1632733.0}, 9: {'spu': 3, 'spu_saled': 1, 'sales': -8611, 'gmv': -2349332.0}, 10: {'spu': 3, 'spu_saled': 2, 'sales': 8055, 'gmv': 716835.0}, 11: {'spu': 3, 'spu_saled': 1, 'sales': -7496, 'gmv': 916460.0}, 12: {'spu': 3, 'spu_saled': 1, 'sales': 5571, 'gmv': -972225.0}, 13: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 14: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 15: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 16: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 17: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 18: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 19: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 20: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 21: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 22: {'spu': 3, 'spu_saled': 0, 'sales': -8055, 'gmv': -716895.0}, 23: {'spu': 0, 'spu_saled': 0, 'sales': 0, 'gmv': 0.0}, 'sum': {'discount': 0.639766081882126, 'gmv_sum': 0.0, 'avg_org_price': 244.28571429350995, 'sales_sum': 0, 'avg_price': 156.28571429333533, 'spu_sum': 36, 'spu_saled_sum': 3}}
    """
    # 保存
    # stat_rdd = sc.parallelize(json.dumps(str(js),ensure_ascii=False),numSlices=1)
    stat_rdd = sc.parallelize([str(js)])
    # stat_rdd.saveAsTextFile(target_stat_path)
    # stat_rdd.repartition(1).saveAsTextFile(target_stat_path)
    # stat_rdd.map(json.dumps).saveAsPickleFile(target_stat_path)
    stat_rdd.repartition(1).saveAsTextFile(target_stat_path)
    print  'Saved:', target_stat_path
    # from pyspark import SQLContext
    # languagesDF= SQLContext.load(js)
    # languagesDF.write.json(target_stat_path)

关键字

上一篇: 005-Python 变量类型

下一篇: pip install MySQL-py