一、数据源
2019新型冠状病毒疫情时间序列数据仓库
本项目为2019新型冠状病毒(COVID-19/2019-nCoV)疫情状况的时间序列数据仓库,数据来源为丁香园。
链接
https://github.com/BlankerL/DXY-COVID-19-Data https://github.com/BlankerL/DXY-COVID-19-Data/releases/tag/2022.01.04
二、数据预处理
CSV->TXT
使用pandas进行数据预处理, 在世界范围内的数据中筛选出中国的数据
import pandas as pd
# .csv->.txt
data = pd.read_csv('/home/hadoop/datasets/overall_covid/DXYArea.csv')
with open('/home/hadoop/datasets/overall_covid/DXYArea_province.txt', 'a+', encoding='utf-8') as f:
for line in data.values:
tmp = ""
if str(line[2]) == "中国":
for i in line:
tmp = tmp + str(i) + '\t'
tmp = tmp + '\n'
f.write(tmp)
以下为基础数据结构
等处理好基础文件格式后,将文件上传到HDFS文件系统。之所以要上传到HDFS是因为spark on yarn模式下会进行分布式计算,要使用分布式计算就必须使用分布式文件系统,因此统一上传到HDFS中。
处理日期格式
创建RDD的过程中可以使用时间类datetime,这是我们可以对时间进行加减和比较,方便进行后续计算。
def toDate(inputStr):
date = datetime.strptime(inputStr, "%Y-%m-%d %H:%M:%S")# 年-月-日 时:分:秒
return date
建立spark连接
Spark RDD 编程的程序执行入口是SparkContext对象(不论何种编程语言) 只有,构建出SparkContext, 基于它才能执行后续的API调用和计算 本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来
# 返回执行应用名称为spark_app的SparkContext对象
def connect_to_spark(AppName="spark_app"):
conf = SparkConf().setAppName(AppName)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
return spark
TXT->RDD->DataFrame
RDD的创建主要有2种方式:
- 通过并行化集合创建 ( 本地对象转分布式RDD )
- 读取外部数据源 ( 读取文件 )
这里我们选择读取HDFS上的txt文件创建RDD,并从RDD创建DataFrame
# 从HDFS的txt文件中获取RDD
def get_dataframe_from_txt(hdfsurl, spark: SparkSession):
# "hdfs://Master:9000/user/hadoop/covid/DXYArea_China.txt"
# 创建RDD模式
fields = [StructField("continentName", StringType(), False), # 洲名称
StructField("continentEnglishName", StringType(), False), # 洲名称英文
StructField("countryName", StringType(), False), # 国家名称
StructField("countryEnglishName", StringType(), False), # 国家名称英文
StructField("provinceName", StringType(), False), # 省名称
StructField("provinceEnglishName", StringType(), False), # 省名称英文
StructField("province_zipCode", StringType(), False), # 省邮编
StructField("province_confirmedCount", IntegerType(), False), # 省确诊数
StructField("province_suspectedCount", IntegerType(), False), # 省疑似病例数
StructField("province_curedCount", IntegerType(), False), # 省治愈者数
StructField("province_deadCount", IntegerType(), False), # 省死亡数
StructField("updateTime", DateType(), False), # 更新时间
StructField("cityName", StringType(), False), # 城市名
StructField("cityEnglishName", StringType(), False), # 城市英文名
StructField("city_zipCode", StringType(), False), # 城市邮编
StructField("city_confirmedCount", IntegerType(), False), # 城市确诊数
StructField("city_suspectedCount", IntegerType(), False), # 城市疑似病例数
StructField("city_curedCount", IntegerType(), False), # 城市治愈数
StructField("city_deadCount", IntegerType(), False)] # 城市死亡数
schema = StructType(fields) # 生成RDD
rdd0 = spark.sparkContext.textFile(hdfsurl)
rdd1 = rdd0.map(lambda x: x.split("\t")).map(
lambda p: Row(p[0], p[1], p[2], p[3], p[4], p[5], p[6],
int(float(p[7])), int(float(p[8])), int(float(p[9])), int(float(p[10])),
toDate(p[11]), p[12], p[13], p[14],
int(float(p[15])), int(float(p[16])), int(float(p[17])), int(float(p[18]))))
# 生成DataFrame
df = spark.createDataFrame(rdd1, schema)
return df
三、DataFrame操作
DataFrame支持两种风格进行编程,分别是:
- DSL风格
DSL称之为:领域特定语言。 其实就是指DataFrame的特有API DSL风格意思就是以调用API的方式来处理Data 比如:where().limit()
- SQL风格
SQL风格就是使用SQL语句处理DataFrame的数据 比如:sql(“SELECT * FROM xxx)
在这里,我们使用sparksql对数据进行处理并直接写入mysql
DSL风格处理城市信息
使用API式的SparkSQL,可读性更好,写起来更直观,缺点是如果仅仅使用SparkSQL提供的API,功能受限不自由,重复迭代计算性能一般。
数据库结构如下
其中,除了confirm_add和id需要计算,其他都可以从原始的DataFrame中读取出来
- confirm_add的计算
sessionWindow =partitionBy("city").orderBy("update_time") # 限定城市并以时间为依据
创建以城市为分区,以更新时间为依据,在confirm数据上实现错位相减,让后一天的confirm数据表减去当天confirm,计算出每日该城市的新增确诊病例。
- id的计算
ascendWindow =orderBy("update_time") # 生成递增序号
根据时间创建递增序列。
# DSL风格sparksql,生成城市新冠疫情统计数据
def store_city_data(df: DataFrame):
df1 = df.select(
"updateTime",
"provinceName",
"cityName",
"city_confirmedCount",
"city_curedCount",
"city_deadCount",
)
# DataFrame列重命名
df2 = df1.withColumnRenamed("updateTime", "update_time"). \
withColumnRenamed("provinceName", "province"). \
withColumnRenamed("cityName", "city"). \
withColumnRenamed("city_confirmedCount", "confirm"). \
withColumnRenamed("city_curedCount", "heal"). \
withColumnRenamed("city_deadCount", "dead")
# 根据时间和城市把一天内多次统计的信息去重
df3 = df2.dropDuplicates(['update_time', 'city'])
# 根据时间排序
df4 = df3.sort(df3["update_time"].asc())
# 创建窗口函数
sessionWindow = Window.partitionBy("city").orderBy("update_time")
# 限定城市并以时间为依据
ascendWindow = Window.orderBy("update_time")
# 生成递增序号
df5 = df4.withColumn("confirm_add",
func.lag(df4["confirm"], 1).over(sessionWindow)). \
withColumn("id", func.row_number().over(ascendWindow))
# 连接并保存到mysql
df5.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://Master:3306/covid?useSSL=false&useUnicode=true"). \
option("dbtable", "details"). \
option("user", "root"). \
option("password", "HUSTeic2021"). \
save()
生成的数据库格式和DataFrame的格式一致
SQL风格处理全国信息
使用通用SQL形式的SparkSQL,优点是接受度高,而且从DataFrame生成临时表后在使用SQL查询效率更高,而且能实现很多复杂操作,缺点是可读性不如DSL。
数据库结构如下
计算全国数据的思路如下
- 我们有各个省份每天都确诊,疑似,治愈,死亡数据,但是不是每天都有各个省份的数据。如果想要计算全国数据的话不能直接简单相加
- 由上一个数据库建立的思想可以延伸到本次计算,首先计算每日每省数据新增,按照时间来分组,计算每组各个数据之和即是每日新增数据
- 获得每日新增数据后,可以累加逐日数据来计算每日的累计数据
# SQL风格sparksql,生成全国疫情统计
def store_history_data(df: DataFrame, sc: SparkSession):
# 从DataFrame中选择合适的数据创建新的DataFrame
df1 = df.select(
"updateTime",
# 更新时间
"provinceName",
# 省名称
"province_confirmedCount",
# 省确认数
"province_suspectedCount",
# 省疑似数
"province_curedCount",
# 省治愈数
"province_deadCount"
# 省死亡数
)
# 根据更新时间和省名称去重
df2 = df1.dropDuplicates(['updateTime', 'provinceName'])
# 根据时间升序排序
df3 = df2.sort(df2["updateTime"].asc())
# 由RDD创建临时表
df3.createOrReplaceTempView("cntotal")
# 计算疑似病例
suspect_df = sc.sql("select updateTime as ds1,"
"sum(province_suspectedCount) "
"as suspect from cntotal group by updateTime")
# 计算每个省份对于上次时间的新增数据差
raw_df = sc.sql("select t1.updateTime as ds,"
"t1.province_confirmedCount as confirm,"
"t1.province_suspectedCount as suspect,"
"t1.province_curedCount as heal, "
"t1.province_deadCount as dead, "
"t1.province_confirmedCount-t2.province_confirmedCount as confirm_add, "
"t1.province_suspectedCount-t2.province_suspectedCount as suspect_add, "
"t1.province_curedCount-t2.province_curedCount as heal_add, "
"t1.province_deadCount-t2.province_deadCount as dead_add"
" from cntotal t1,"
"cntotal t2 where t1.updateTime = date_add(t2.updateTime,1) and t1.provinceName = t2.provinceName")
raw_df.createOrReplaceTempView("cntotal")
# 创建临时表
# 计算全国每日新增数据
diff = sc.sql("select ds, sum(confirm_add) as confirm_add, " "sum(suspect_add) as suspect_add, " "sum(heal_add) as heal_add, " "sum(dead_add) as dead_add from cntotal group by ds") diff.createOrReplaceTempView("cntotal") # 创建临时表 # 计算全国逐日累计数据 cum = sc.sql("select ds as ds1, " "sum(confirm_add) over(order by ds asc)as confirm , " "sum(heal_add) over(order by ds asc) as heal, " "sum(dead_add) over(order by ds asc) as dead " "from cntotal") # 将DataFrame进行连接操作,使得三个DataFrame合成一个 history = diff.join(suspect_df, suspect_df["ds1"] == diff["ds"], "inner") history = history.join(cum, cum["ds1"] == history["ds"], "inner") history = history.drop("ds1").sort(history["ds"].asc()) # 将DataFrame写入mysql history.write.mode("overwrite"). \ format("jdbc"). \ option("url", "jdbc:mysql://Master:3306/covid?useSSL=false&useUnicode=true"). \ option("dbtable", "history"). \ option("user", "root"). \ option("password", "HUSTeic2021"). \ save()
生成的数据库格式和DataFrame的格式一致