模拟实现一个ATM + 购物商城程序
1.额度 15000或自定义
4.支持多账户登录
5.支持账户间转账
9.提供管理接口,包括添加账户、用户额度,冻结账户等。。。
10.用户认证用装饰器
3.可以提现,手续费5%
2.实现购物商城,买东西加入 购物车,调用信用卡接口结账
6.记录每月日常消费流水
7.提供还款接口
8.ATM记录操作日志
包含随时返回:b 和随时退出:q
ps:使用面向过程的思想去实现
画的贼烂,框架如上图
ATM+购物车 |-------conf 配置加接口 | |------interface.py 接口配置 | |------settings.py 路径加日志配置 | |-------core 核心 用户与超管 | |------admin.py 超管功能 | |------atm.py ATM功能 | |------shopping.py 购物功能 | |-------db 数据库 | |------userdb 用户文件 json格式 | |------dbhandler.py 数据层操作 | |-------lib 公共库 | |------common.py 用户认证加日志功能 | |-------log 日志文件夹 | |------operation.log 操作日志 | |------trading.log 交易日志 |-------start.py 程序启动文件
#coding:utf-8 from db import dbhandler from lib import common logger1=common.get_logger("操作日志") logger2=common.get_logger('交易日志') def add_user(name,pwd='123',lines=15000): '''添加账户接口''' user_dict={'name':name,'password':pwd,'lines':lines,'balance':lines,'buy':[],'lock':False} state,msg=dbhandler.query(name) if state: return False,'用户已存在' else: dbhandler.save(user_dict) logger1.info('%s用户创建成功' % name) return True, '%s用户创建成功,密码为%s' % (name,pwd) def trans_interface(my_name,trans_name,money): '''转账接口''' state_my,msg_my=dbhandler.query(my_name) state_tra,msg_tra=dbhandler.query(trans_name) if state_tra: if int(money) < int(msg_my['balance']): msg_my['balance']=int(msg_my['balance'])-int(money) msg_tra['balance']=int(msg_tra['balance'])+int(money) dbhandler.save(msg_my) dbhandler.save(msg_tra) logger2.info('%s转账给%s%s元成功'%(my_name,trans_name,money)) return True,'%s转账给%s%s元成功'%(my_name,trans_name,money) else: return False,'转账失败,余额不足' else: return False,'转账用户不存在' def withdrawal_interface(name,money): '''提现接口''' state,msg=dbhandler.query(name) poundage=int(money)*5/100 if msg['balance'] < (int(money)+poundage): return False,'提现失败,余额不足' else: msg['balance']=msg['balance'] - int(money) - poundage dbhandler.save(msg) logger2.info('%s提现%s元成功,手续费为%s元'%(name,money,poundage)) return True,'%s提现%s元成功,手续费为%s元'%(name,money,poundage) def repay_interface(name,money): '''还款接口''' state,msg=dbhandler.query(name) if state: msg['balance']=msg['balance']+int(money) dbhandler.save(msg) logger2.info('%s还款%s元成功,余额为%s'%(name,money,msg['balance'])) return True,'%s还款%s元成功,余额为%s'%(name,money,msg['balance']) def check_balance(name): '''查看余额接口''' state,user_dict=dbhandler.query(name) res=user_dict['balance'] return res def shopping_interface(name,cost,shopping_dict): '''购物支付接口''' state,user_dict=dbhandler.query(name) if user_dict['balance'] > cost: user_dict['balance']-=cost user_dict['buy'].append(shopping_dict) dbhandler.save(user_dict) logger2.info('%s购买成功,消费了%s元'%(name,cost)) return True,'购买成功,消费了%s元'%cost else: return False,'余额不足,购买失败' def check_shop_car(name): '''查看购物记录即可''' state,user_dict=dbhandler.query(name) res=user_dict['buy'] return res
# coding:utf-8 # ============================路径配置加日志配置项(日志配置模板)========================================= import os BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) USER_DB = os.path.join(BASE_DIR, 'db', 'userdb') standard_format = '[%(asctime)s][task_id:%(name)s]' \ '[%(levelname)s]< %(message)s >' # 其中name为getlogger指定的名字 simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s' id_simple_format = '[%(asctime)s][%(levelname)s] %(message)s' # log文件的路径 OPE_LOG_PATH = os.path.join(BASE_DIR, 'log', 'operation.log') # 操作日志路径 TRA_LOG_PATH = os.path.join(BASE_DIR, 'log', 'trading.log') # 交易日志路径 # log配置字典 LOGGING_DIC = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': standard_format }, 'simple': { 'format': id_simple_format }, }, 'filters': {}, 'handlers': { # 打印到终端的日志,使用的格式为 simple_format 'ch': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', # 打印到屏幕 'formatter': 'simple' }, # 打印到文件的日志,使用的格式为 standard_format 'default1': { 'level': 'DEBUG', 'class': 'logging.FileHandler', # 保存到文件 'formatter': 'standard', 'filename': OPE_LOG_PATH, # 日志文件 'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了 }, 'default2': { 'level': 'DEBUG', 'class': 'logging.FileHandler', # 保存到文件 'formatter': 'standard', 'filename': TRA_LOG_PATH, # 日志文件 'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了 }, }, 'loggers': { '操作日志': { 'handlers': ['ch', 'default1'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕 'level': 'DEBUG', 'propagate': False, }, '交易日志': { 'handlers': ['ch', 'default2'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕 'level': 'DEBUG', 'propagate': False, }, }, }
# coding:utf-8 from conf import interface from db import dbhandler from lib import common tag = True def adduser(): '''添加用户''' while True: name = input('请输入账户名:').strip() if not name: continue name1 = input('请确认账户名:').strip() if name != name1: print('两次输入不一致!') continue state, msg = interface.add_user(name1) if state: print(msg) break else: print(msg) def set_lines(): '''设置额度''' while True: name = input('请输入账户名:').strip() if not name: continue name1 = input('请确认账户名:').strip() if name != name1: print('两次输入不一致!') continue lines = input('请输入信用额度:').strip() if lines.isdigit(): state, msg = interface.dbhandler.query(name1) if state: msg['lines'] = lines dbhandler.save(msg) interface.logger1.info('%s设置%s成功,信用额度为%s!!' % ('admin', name1, lines)) print('%s设置%s成功,信用额度为%s!!' % ('admin', name1, lines)) else: print(msg) else: print('额度请输入数字') break def freeze(): '''冻结功能''' name = input('请输入冻结账户名:').strip() name1 = input('请确认冻结账户名:').strip() if name != name1: print('两次输入不一致!') return state, msg = dbhandler.query(name1) if state: msg['lock'] = True dbhandler.save(msg) interface.logger1.info('%s冻结%s成功' % ('admin', name1)) print('%s冻结%s成功' % (common.user_state['name'], name1)) else: print(msg) def unfreeze(): '''解冻功能''' name = input('请输入冻结账户名:').strip() name1 = input('请确认冻结账户名:').strip() if name != name1: print('两次输入不一致!') return state, msg = dbhandler.query(name1) if state: msg['lock'] = False dbhandler.save(msg) interface.logger1.info('%s解冻%s成功' % ('admin', name1)) print('%s解冻%s成功' % ('admin', name1)) else: print(msg) def check_log(): '''查看操作日志功能''' dbhandler.query_log('admin') def menu(): view = ''' 1.添加账户 2.设置用户额度 3.冻结账户 4.解冻账户 5.查看操作日志 q.退出 ''' print(view) @common.login_auth(auth='admin') def main(): view_dict = { '1': adduser, '2': set_lines, '3': freeze, '4': unfreeze, '5': check_log } global tag while tag: print('管理员'.center(60, '-')) menu() choice = input('请选择:').strip() if choice == 'q': tag = False break if not choice or choice not in view_dict: continue view_dict[choice]()
# coding:utf-8 from core import admin from conf import interface from db import dbhandler from lib import common def registered(): '''注册功能''' if common.user_state['name']: print('已登录,无法注册') return while True: print('注册'.center(40, '-')) name = input('请输入账户名:').strip() if not name: continue pwd1 = input('请输入密码:').strip() if not pwd1: continue pwd2 = input('请确认密码:').strip() if pwd1 != pwd2: print('两次输入密码不一致!') break if len(pwd2) < 3: print('密码强度太低') continue state, msg = interface.add_user(name, pwd2) if state: print(msg) break else: print(msg) def check_balance(): '''查看余额''' balance = interface.check_balance(common.user_state['name']) print('%s当前余额为%s元!!!' % (common.user_state['name'], balance)) def transfer(): '''转账''' while True: print('转账'.center(40, '-')) name = input('请输入要转账的账户名:').strip() name1 = input('请确认账户名:').strip() if name != name1: print('账户名不一致') continue money = input('请输入转账金额:'.strip()) if money.isdigit(): state, msg = interface.trans_interface(common.user_state['name'], name1, money) if state: print(msg) break else: print(msg) break else: print('金额请输入数字!') continue def repay(): '''还款''' state, msg = dbhandler.query(common.user_state['name']) arrears = msg['lines'] - msg['balance'] print('您的欠款为%s元!!!' % (arrears)) repay_wage = input('请输入您要还款的金额: ').strip() if not repay_wage.isdigit(): print('请输入数字!') return else: state, msg = interface.repay_interface(common.user_state['name'], repay_wage) print(msg) def withdrawal(): '''提现''' while True: print('提现'.center(40, '-')) money = input('请输入要提现的金额:').strip() if not money.isdigit(): continue message = input('提现将要收取%5手续费,是否继续(y/n):').strip() if message not in ['y', 'n']: continue if message == 'y': state, msg = interface.withdrawal_interface(common.user_state['name'], money) if state: print(msg) break else: print(msg) else: break def check_log(): '''查看操作日志''' dbhandler.query_log(common.user_state['name']) def view(): view_dict = ''' 1.查询余额 2.转账 3.还款 4.提现 5.查询日志 b.返回上一层 q.退出 ''' print(view_dict) @common.login_auth(auth='user') def main(): view_dict = { '1': check_balance, '2': transfer, '3': repay, '4': withdrawal, '5': check_log } while admin.tag: print('ATM操作界面'.center(60, '-')) view() choice = input('请选择:').strip() if choice == 'q': admin.tag = False break if choice == 'b': break if choice not in view_dict: continue view_dict[choice]()
# coding:utf-8 from lib import common from core import admin from conf import interface shopping_dict = {} # 购物车 def product(): '''购物功能''' product_list = [ ['华为P20', 5488], ['笔记本', 5599], ['Nike运动鞋', 669], ['十三香小龙虾', 165], ['山地车', 399], ['天梭表', 3699], ['某宝t恤', 69], ['三只松鼠', 59], ['重庆小面', 18], ] cost = 0 # 初始总花费为0 user_balance = interface.check_balance(common.user_state['name']) # 调接口查询余额 global shopping_dict while admin.tag: print('商品列表'.center(50, '-')) for k, v in enumerate(product_list): # 将商品列表 加上索引打印出来 print(k, '', v) print('q : 退出/购买 ') choice = input('\n请选择购买商品:').strip() if choice.isdigit(): # 判断输入是否为数字 choice = int(choice) if choice >= len(product_list): continue # 输入的数字大于索引则重新开始 product_name = product_list[choice][0] # 拿到商品名 product_price = product_list[choice][1] # 拿到商品价格 if user_balance >= product_price: # 判断余额是否大于商品价格 if product_name in shopping_dict: # 判断商品是否已在购物车里 shopping_dict[product_name]['数量'] += 1 # 有则购物车的对应商品数量加1 else: shopping_dict[product_name] = {'价格': product_price, '数量': 1} # 购物车添加新字典 user_balance -= product_price # 余额减去商品价格 cost += product_price # 花费加上商品价格 print('%s加入购物车成功' % [product_name, product_price]) else: print('余额不足') elif choice == 'q': if cost == 0: break for k, v in shopping_dict.items(): print(k, '', v) buy = input('是否确认购买(y/n),需支付%s元:' % cost).strip() if buy == 'y': # 确认支付时调支付接口 state, msg = interface.shopping_interface(common.user_state['name'], cost, shopping_dict) if state: print(msg) break else: print(msg) break else: print('无任何商品购买') shopping_dict = {} break else: print('输入错误') continue def shop_car(): '''查看购物车功能''' for k, v in shopping_dict.items(): print(k, '', v) def check_shop(): '''查看消费记录功能''' shop_dic = interface.check_shop_car(common.user_state['name']) for i in shop_dic: print(i) def view(): view_dict = ''' 1.商品列表 2.购物车 3.查看消费记录 b.返回上一层 q.退出 ''' print(view_dict) @common.login_auth(auth='user') def main(): view_dict = { '1': product, '2': shop_car, '3': check_shop, } while admin.tag: print('购物界面'.center(60, '-')) view() choice = input('请选择:').strip() if choice == 'q': admin.tag = False break if choice == 'b': break if choice not in view_dict: continue view_dict[choice]()
#coding:utf-8 import json,os from conf import settings def save(user_dic): '''数据写入''' name=user_dic['name'] with open(settings.USER_DB+r'\%s.json'%name,'wt') as f: json.dump(user_dic,f) f.flush() def query(name): '''查询用户信息''' db_path=settings.USER_DB+r'\%s.json'%name if os.path.isfile(db_path): with open(db_path,'rt') as f: res=json.load(f) return True,res else: return False, '用户不存在' def query_log(name): '''查询日志功能''' with open(settings.OPE_LOG_PATH,'rt',encoding='utf-8') as f: for names in f.readlines(): if name in names: print(names) with open(settings.TRA_LOG_PATH,'rt',encoding='utf-8') as f1: for names in f1.readlines(): if name in names: print(names)
#:coding:utf-8 user_state = {'name': None} from conf import settings import logging.config from db import dbhandler def login_user(): '''认证''' count = 0 while True: name = input('请输入用户名:').strip() pwd = input('请输入密码:').strip() state, msg = dbhandler.query(name) if count == 3: print('您已被锁定') msg['lock'] = True dbhandler.save(msg) break if state: # 查询状态 if pwd == msg['password'] and not msg['lock']: user_state['name'] = name print('%s登录成功' % name) break elif msg['lock'] == True: print('您已被锁定') break else: count += 1 print('密码错误%s次' % count) else: print(msg) break def login_admin(): '''管理员认证''' admin_dict = {'name': 'admin', 'password': '123', 'lock': False} name = input('请输入管理员账户:').strip() pwd = input('请输入密码:').strip() if name != admin_dict['name'] or pwd != admin_dict['password']: print('账号或密码错误') else: print('登录成功') user_state['name'] = name def login_auth(auth='module'): def outter(func): '''装饰器''' def warppers(*args, **kwargs): if auth == 'user': if user_state['name'] == None: login_user() if user_state['name']: func(*args, **kwargs) if auth == 'admin': if user_state['name'] == None: login_admin() if user_state['name'] == 'admin': func(*args, **kwargs) else: print('没有管理员权限!') return return warppers return outter def get_logger(name): '''日志模块''' logging.config.dictConfig(settings.LOGGING_DIC) logger = logging.getLogger(name) return logger
# coding:utf-8 # ================================start.py======================================= import sys, os BASE_DIR = os.path.dirname(__file__) sys.path.append(BASE_DIR) from core import atm, shopping, admin def menu(): view = ''' 1.注册 2.ATM 3.购物 4.管理员 q.退出 ''' print(view) if __name__ == '__main__': view_dict = { '1': atm.registered, '2': atm.main, '3': shopping.main, '4': admin.main } while admin.tag: print('总界面'.center(60, '-')) menu() choice = input('请选择:').strip() if choice == 'q': break if not choice or choice not in view_dict: continue view_dict[choice]()
花了点时间将之前的面向过程编程作业重写了次,完成后感觉还是写的不够好。时间原因就先这样。
2018-09-05,16:03:47