Python Flask定时调度疫情大数据爬取全栈项目实战使用-8.存储腾讯的数据

python-flask xuhss 790℃ 0评论

存储腾讯数据

1.安装数据库

参考教程安装mysql:

https://xuhss.com/sql/centos7

https://xuhss.com/sql/windows

2.创建数据库

打开cmd,切换到这个目录:

cd C:\software\mysql8.0.23\bin

登录mysql:

mysql -h35.241.84.84  -uroot -p你的密码
create database cov;

show databases;

use cov;
CREATE TABLE history(
    ds datetime NOT NULL COMMENT '日期',
    confirm int(11) DEFAULT NULL COMMENT '累计确诊',
    confirm_add int(11) DEFAULT NULL COMMENT '当日新增确诊',
    suspect int(11) DEFAULT NULL COMMENT '剩余疑似',
    suspect_add int(11) DEFAULT NULL COMMENT '当日新增疑似',
    heal int(11) DEFAULT NULL COMMENT '累计治愈',
    heal_add int(11) DEFAULT NULL COMMENT '当日新增治愈',
    dead int(11) DEFAULT NULL COMMENT '累计死亡',
    dead_add int(11) DEFAULT NULL COMMENT '当日新增死亡',
    PRIMARY KEY (ds) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE details(
    id int(11) NOT NULL AUTO_INCREMENT,
    update_time datetime DEFAULT NULL COMMENT '数据最后的更新时间点',
    province varchar(50) DEFAULT NULL COMMENT '省',
    city varchar(50) DEFAULT NULL COMMENT '市',
    confirm int(11) DEFAULT NULL COMMENT '累计确诊',
    confirm_add int(11) DEFAULT NULL COMMENT '新增确诊',
    heal int(11) DEFAULT NULL COMMENT '累计治愈',
    dead int(11) DEFAULT NULL COMMENT '累计死亡',
    PRIMARY KEY (id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
desc history;

3.Python 连接数据库

安装pymysql

打开cmd

pip install pymysql
  1. 建立连接
  2. 创建游标
  3. 执行操作
  4. 关闭连接

实例操作-查询数据库

import pymysql
import time
import json
import traceback #异常处理

#建立连接
conn = pymysql.connect(host="35.241.84.84",
                      user='root',
                      password='123qweasdzxc',
                      db='cov')
#创建游标
cursor = conn.cursor()
sql="select * from history"
cursor.execute(sql)

#执行操作
res = cursor.fetchall()
print(res)

cursor.close()
conn.close()

实例操作-插入数据

import pymysql
import time
import json
import traceback #异常处理

#建立连接
conn = pymysql.connect(host="35.241.84.84",
                      user='root',
                      password='123qweasdzxc',
                      db='cov')
#创建游标
cursor = conn.cursor()
sql="insert into history values(%s,%s,%s,%s,%s,%s,%s,%s,%s)"

cursor.execute(sql,[time.strftime("%Y-%m-%d, %H:%M:%S"),10,1,2,3,4,5,6,7])
conn.commit() #提交事务
#执行操作
#res = cursor.fetchall()
#print(res)

cursor.close()
conn.close()

4.结合腾讯数据

import requests
import json
import time
import traceback #异常处理

def get_conn():
    #创建连接
    conn = pymysql.connect(host="35.241.84.84",
                      user='root',
                      password='123qweasdzxc',
                      db='cov')
    #创建游标
    cursor = conn.cursor()

    return conn, cursor

def close_conn(conn, cursor):
    if cursor:
        cursor.close()
    if conn:
        conn.close()

def get_tencent_data():
    """
    :return:返回历史数据和当日详细数据
    """
    header = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36"
    }
    url = "https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5"
    r = requests.get(url, headers = header)
    res = json.loads(r.text)#  json字符串转换为字典
    data_all = json.loads(res['data'])
    updateTime = data_all["lastUpdateTime"]

    history = {} #历史数据
    chinaTotal =  data_all["chinaTotal"]
    confirm = chinaTotal["confirm"]
    suspect = chinaTotal["suspect"]
    heal = chinaTotal["heal"]
    dead = chinaTotal["dead"]
    history[updateTime] = {"confirm":confirm, "suspect":suspect, "heal":heal, "dead":dead}

    chinaAdd =  data_all["chinaAdd"]
    confirmAdd = chinaTotal["confirm"]
    suspectAdd = chinaTotal["suspect"]
    healAdd = chinaTotal["heal"]
    deadAdd = chinaTotal["dead"]
    history[updateTime].update({"confirm_add":confirmAdd, "suspect_add":suspectAdd, "heal_add":healAdd, "dead_add":deadAdd})
    print(history)

    details  = [] #当日详细数据
    update_time = data_all["lastUpdateTime"]
    data_country = data_all["areaTree"] # 国家 只有一个中国 没有其他国家
    data_province = data_country[0]["children"] # 中国各省
    for pro_infos in data_province:
        province = pro_infos["name"] # 省名
        for city_infos in pro_infos["children"]:
            city = city_infos["name"] #
            confirm = city_infos["total"]["confirm"]
            confirm_add = city_infos["today"]["confirm"]
            heal = city_infos["total"]["heal"]
            dead = city_infos["total"]["dead"]     
            details.append([update_time, province, city, confirm, confirm_add, heal, dead])
            #print([update_time, province, city, confirm, confirm_add, heal, dead])
    return history, details

def update_details():
    """
    :更新详情表
    """
    cursor = None
    conn = None
    try:
        li = get_tencent_data()[1] #0是历史数据字典 1是最新详细数据列表
        conn, cursor = get_conn()
        sql = "insert into details(update_time,province,city,confirm,confirm_add,heal,dead) values(%s,%s,%s,%s,%s,%s,%s)"
        sql_query = "select %s=(select update_time from details order by id desc limit 1)" #对比当前最大时间戳
        cursor.execute(sql_query, li[0][0])
        if not cursor.fetchone()[0]:
            print(f"{time.asctime()}开始更新最新的数据操作")
            for item in li:
                cursor.execute(sql, item)
            conn.commit() #提交事务 update delete insert操作
            print(f"{time.asctime()}更新最新数据操作完毕")
        else:
            print(f"{time.asctime()}一是最新数据")
    except:
        traceback.print_exc()
    finally:
        close_conn(conn, cursor)

def insert_hostory():
    """
    :插入历史数据
    """
    cursor = None
    conn = None
    try:
        dic = get_tencent_data()[0] #0是历史数据字典 1是最新详细数据列表
        print(f"{time.asctime()}开始插入历史数据操作")
        conn, cursor = get_conn()
        sql = "insert into history values(%s,%s,%s,%s,%s,%s,%s,%s,%s)"
        for k,v in dic.items():
            #itm 格式{'2021-03-03':{'confirm':41,'suspect':0, 'heal':0,'dead':0}}
            cursor.execute(sql, [k, v.get("confirm"),v.get("confirm_add"),v.get("suspect"),v.get("suspect_add"),v.get("heal"),v.get("heal_add"),v.get("dead"),v.get("dead_add")])

        conn.commit() #提交事务 update delete insert操作
        print(f"{time.asctime()}插入历史数据操作完毕")
    except:
        traceback.print_exc()
    finally:
        close_conn(conn, cursor)

def update_history():
    """
    :更新历史数据
    """
    cursor = None
    conn = None
    try:
        dic = get_tencent_data()[0] #0是历史数据字典 1是最新详细数据列表
        print(f"{time.asctime()}开始更新历史数据操作")
        conn, cursor = get_conn()
        sql = "insert into history values(%s,%s,%s,%s,%s,%s,%s,%s,%s)"
        sql_query = "select confirm from history where ds=%s"
        for k,v in dic.items():
            #itm 格式{'2021-03-03':{'confirm':41,'suspect':0, 'heal':0,'dead':0}}
            if not cursor.execute(sql_query, k):
                cursor.execute(sql, [k, v.get("confirm"),v.get("confirm_add"),v.get("suspect"),v.get("suspect_add"),v.get("heal"),v.get("heal_add"),v.get("dead"),v.get("dead_add")])

        conn.commit() #提交事务 update delete insert操作
        print(f"{time.asctime()}历史数据更新操作完毕")
    except:
        traceback.print_exc()
    finally:
        close_conn(conn, cursor)

update_history()

转载请注明:xuhss » Python Flask定时调度疫情大数据爬取全栈项目实战使用-8.存储腾讯的数据

喜欢 (4)

您必须 登录 才能发表评论!