前段時間,公司有任務(wù),要求寫一個同步數(shù)據(jù)的腳本。大致是由于數(shù)據(jù)更新過于頻繁,因此以接口形式讀取數(shù)據(jù),隨時追加到本地數(shù)據(jù)庫,便于甲方爸爸查詢。
import requests
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from urllib.parse import quote_plus as urlquote
from sqlalchemy import Table
#數(shù)據(jù)庫配置
class BaseConfig:
DRIVER = 'mysql'
USER = 'root'
PASSWORD = 'Admin@123'
SERVER = '127.0.0.1'
PORT = '3306'
DATABASE = 'USER_PROFILE'
SQLALCHEMY_DATABASE_URI = '{}+pymysql://{}:{}@{}:{}/{}?charset=utf8' \
.format(DRIVER, USER, urlquote(PASSWORD), SERVER, PORT, DATABASE)
class InsertData:
#需要同步的表
table_list = {
'01': 'MOBILE_USER_PROFILE',
'02': 'INTERNET_USER_PROFILE',
'03': 'TERMINAL_BRAND_PROFILE',
'04': 'ACTIVITE_INTERNET_PROFILE',
'05': 'ACTIVITE_PHONE_PROFILE',
'06': 'FREQUENCY_PHONE_PROFILE',
'07': 'FAMILY_USER_PROFILE',
'08': 'JOB_USER_PROFILE',
'09': 'DETAIL_USER_PROFILE',
'10': 'CAR_USER_PROFILE',
'11': 'SHORT_VIDEO_PROFILE',
'12': 'LONG_VIDEO_PROFILE',
'13': 'LIVE_VIDEO_PROFILE',
'14': 'LOCATION_USER_PROFILE',
'15': 'COMMUTE_USER_PROFILE',
'16': 'FINANCE_USER_PROFILE',
'17': 'HOUSING_USER_PROFILE',
'18': 'BUSINESS_TRIP_PROFILE',
'19': 'VIDEO_CONTENT_PROFILE',
'20': 'FOOD_APP_PROFILE',
'21': 'GRAVIDA_USER_PROFILE',
'22': 'MUSIC_USER_PROFILE',
'23': 'SPORT_USER_PROFILE',
'24': 'MOVIE_USER_PROFILE',
'25': 'NEWS_USER_PROFILE',
'26': 'FORUM_USER_PROFILE'
}
#product_id接口ID,mobile接口參數(shù),main_url接口URL。格式為:http://main_url/product_id/?mobile=xxx
def __init__(self, product_id, mobile, main_url):
self.product_id = product_id
self.mobile = mobile
self.main_url = main_url
#連接數(shù)據(jù)庫
def connect_database(self):
engine = create_engine(BaseConfig.SQLALCHEMY_DATABASE_URI, echo=False)
metadata = MetaData(engine)
connector = engine.connect()
session = sessionmaker(bind=engine)()
return connector, metadata, session
#讀取數(shù)據(jù)。這里先判斷手機號是否存在
def get_data(self):
table_name = self.table_list.get(self.product_id)
data_dict = {}
if table_name is None:
print('系統(tǒng)錯誤,請核查product_id')
else:
#傳參數(shù)讀取json數(shù)據(jù)
url = self.main_url + '{}'.format(self.product_id)
params = {'mobile': self.mobile}
response = requests.get(url, params)
#如果查到,返回200.查不到則返回404。
if response.status_code == 200:
data_dict = response.json()
return data_dict
#追加數(shù)據(jù)之前先檢查
def check_data(self):
data = self.get_data()
mobile_number = data.get('MOBILE_NUMBER')
table_name = self.table_list.get(self.product_id)
if not mobile_number:
print('沒有查到相關(guān)信息')
else:
#ORM只需要傳入鍵值對(比如,字典)即可。省去了拼接SQL代碼的過程。
cursor, metadata, session = self.connect_database()
orm_table = Table(table_name, metadata, autoload=True)
count = session.query(orm_table).filter_by(MOBILE_NUMBER=mobile_number).first()
if count is None:
#為避免重復(fù),如果有數(shù)據(jù)則不再追加。
data_dict = self.get_data()
insertor = orm_table.insert()
cursor.execute(insertor, data_dict)
print('數(shù)據(jù)已插入')
else:
print('數(shù)據(jù)已存在')
#測試
mobile = 'C17C402C57E170209975AA3DDE64368C'
product_id = '11'
main_url = 'http://127.0.0.1:5000/products/'
x = InsertData(product_id, mobile, main_url)
x.check_data()