基于PySpark的新冠疫情数据分析

大数据 spark

一、数据源

2019新型冠状病毒疫情时间序列数据仓库

本项目为2019新型冠状病毒(COVID-19/2019-nCoV)疫情状况的时间序列数据仓库,数据来源为丁香园

链接

https://github.com/BlankerL/DXY-COVID-19-Data https://github.com/BlankerL/DXY-COVID-19-Data/releases/tag/2022.01.04

二、数据预处理

CSV->TXT

使用pandas进行数据预处理, 在世界范围内的数据中筛选出中国的数据

import pandas as pd  
# .csv->.txt  
data = pd.read_csv('/home/hadoop/datasets/overall_covid/DXYArea.csv')
with open('/home/hadoop/datasets/overall_covid/DXYArea_province.txt', 'a+', encoding='utf-8') as f:
     for line in data.values:
         tmp = ""
         if str(line[2]) == "中国":
             for i in line:
                 tmp = tmp + str(i) + '\t'
             tmp = tmp + '\n'
             f.write(tmp)

以下为基础数据结构

等处理好基础文件格式后,将文件上传到HDFS文件系统。之所以要上传到HDFS是因为spark on yarn模式下会进行分布式计算,要使用分布式计算就必须使用分布式文件系统,因此统一上传到HDFS中。

处理日期格式

创建RDD的过程中可以使用时间类datetime,这是我们可以对时间进行加减和比较,方便进行后续计算。

def toDate(inputStr):
     date = datetime.strptime(inputStr, "%Y-%m-%d %H:%M:%S")# 年-月-日 时:分:秒
     return date

建立spark连接

Spark RDD 编程的程序执行入口是SparkContext对象(不论何种编程语言) 只有,构建出SparkContext, 基于它才能执行后续的API调用和计算 本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来

# 返回执行应用名称为spark_app的SparkContext对象 
def connect_to_spark(AppName="spark_app"):
     conf = SparkConf().setAppName(AppName)
     spark = SparkSession.builder.config(conf=conf).getOrCreate()
     return spark

TXT->RDD->DataFrame

RDD的创建主要有2种方式:

  1. 通过并行化集合创建 ( 本地对象转分布式RDD )
  2. 读取外部数据源 ( 读取文件 )

这里我们选择读取HDFS上的txt文件创建RDD,并从RDD创建DataFrame

# 从HDFS的txt文件中获取RDD
def get_dataframe_from_txt(hdfsurl, spark: SparkSession):
     # "hdfs://Master:9000/user/hadoop/covid/DXYArea_China.txt"
     # 创建RDD模式
     fields = [StructField("continentName", StringType(), False),  # 洲名称
               StructField("continentEnglishName", StringType(), False),  # 洲名称英文
               StructField("countryName", StringType(), False),  # 国家名称
               StructField("countryEnglishName", StringType(), False),  # 国家名称英文
               StructField("provinceName", StringType(), False),  # 省名称
               StructField("provinceEnglishName", StringType(), False),  # 省名称英文
               StructField("province_zipCode", StringType(), False),  # 省邮编
               StructField("province_confirmedCount", IntegerType(), False),  # 省确诊数
               StructField("province_suspectedCount", IntegerType(), False),  # 省疑似病例数
               StructField("province_curedCount", IntegerType(), False),  # 省治愈者数
               StructField("province_deadCount", IntegerType(), False),  # 省死亡数
               StructField("updateTime", DateType(), False),  # 更新时间
               StructField("cityName", StringType(), False),  # 城市名
               StructField("cityEnglishName", StringType(), False),  # 城市英文名
               StructField("city_zipCode", StringType(), False),  # 城市邮编
               StructField("city_confirmedCount", IntegerType(), False),  # 城市确诊数
               StructField("city_suspectedCount", IntegerType(), False),  # 城市疑似病例数
               StructField("city_curedCount", IntegerType(), False),  # 城市治愈数
               StructField("city_deadCount", IntegerType(), False)]  # 城市死亡数
     schema = StructType(fields)     # 生成RDD
     rdd0 = spark.sparkContext.textFile(hdfsurl)
     rdd1 = rdd0.map(lambda x: x.split("\t")).map(
         lambda p: Row(p[0], p[1], p[2], p[3], p[4], p[5], p[6],
                       int(float(p[7])), int(float(p[8])), int(float(p[9])), int(float(p[10])),
                       toDate(p[11]), p[12], p[13], p[14],
                       int(float(p[15])), int(float(p[16])), int(float(p[17])), int(float(p[18]))))
     # 生成DataFrame
     df = spark.createDataFrame(rdd1, schema)
     return df

三、DataFrame操作

DataFrame支持两种风格进行编程,分别是:

  1. DSL风格

    DSL称之为:领域特定语言。 其实就是指DataFrame的特有API DSL风格意思就是以调用API的方式来处理Data 比如:where().limit()

  2. SQL风格

    SQL风格就是使用SQL语句处理DataFrame的数据 比如:sql(“SELECT * FROM xxx)

在这里,我们使用sparksql对数据进行处理并直接写入mysql

DSL风格处理城市信息

使用API式的SparkSQL,可读性更好,写起来更直观,缺点是如果仅仅使用SparkSQL提供的API,功能受限不自由,重复迭代计算性能一般。

数据库结构如下

其中,除了confirm_add和id需要计算,其他都可以从原始的DataFrame中读取出来

  1. confirm_add的计算

    sessionWindow =partitionBy("city").orderBy("update_time") # 限定城市并以时间为依据

    创建以城市为分区,以更新时间为依据,在confirm数据上实现错位相减,让后一天的confirm数据表减去当天confirm,计算出每日该城市的新增确诊病例。

  2. id的计算

    ascendWindow =orderBy("update_time") # 生成递增序号

    根据时间创建递增序列。

# DSL风格sparksql,生成城市新冠疫情统计数据 
def store_city_data(df: DataFrame): 
    df1 = df.select( 
        "updateTime", 
        "provinceName", 
        "cityName", 
        "city_confirmedCount", 
        "city_curedCount", 
        "city_deadCount", 
    ) 
    # DataFrame列重命名 
    df2 = df1.withColumnRenamed("updateTime", "update_time"). \ 
        withColumnRenamed("provinceName", "province"). \ 
        withColumnRenamed("cityName", "city"). \ 
        withColumnRenamed("city_confirmedCount", "confirm"). \ 
        withColumnRenamed("city_curedCount", "heal"). \ 
        withColumnRenamed("city_deadCount", "dead") 
     # 根据时间和城市把一天内多次统计的信息去重 
    df3 = df2.dropDuplicates(['update_time', 'city']) 
     # 根据时间排序 
    df4 = df3.sort(df3["update_time"].asc()) 
     # 创建窗口函数 
    sessionWindow = Window.partitionBy("city").orderBy("update_time") 
      # 限定城市并以时间为依据 
    ascendWindow = Window.orderBy("update_time") 
                           # 生成递增序号 
    df5 = df4.withColumn("confirm_add", 
                         func.lag(df4["confirm"], 1).over(sessionWindow)). \ 
        withColumn("id", func.row_number().over(ascendWindow)) 
          # 连接并保存到mysql 
    df5.write.mode("overwrite"). \ 
        format("jdbc"). \ 
        option("url", "jdbc:mysql://Master:3306/covid?useSSL=false&useUnicode=true"). \ 
        option("dbtable", "details"). \ 
        option("user", "root"). \ 
        option("password", "HUSTeic2021"). \ 
        save()

生成的数据库格式和DataFrame的格式一致

SQL风格处理全国信息

使用通用SQL形式的SparkSQL,优点是接受度高,而且从DataFrame生成临时表后在使用SQL查询效率更高,而且能实现很多复杂操作,缺点是可读性不如DSL。

数据库结构如下

计算全国数据的思路如下

  1. 我们有各个省份每天都确诊,疑似,治愈,死亡数据,但是不是每天都有各个省份的数据。如果想要计算全国数据的话不能直接简单相加
  2. 由上一个数据库建立的思想可以延伸到本次计算,首先计算每日每省数据新增,按照时间来分组,计算每组各个数据之和即是每日新增数据
  3. 获得每日新增数据后,可以累加逐日数据来计算每日的累计数据
# SQL风格sparksql,生成全国疫情统计 
def store_history_data(df: DataFrame, sc: SparkSession):
     # 从DataFrame中选择合适的数据创建新的DataFrame
     df1 = df.select(
         "updateTime",
	        # 更新时间
         "provinceName",
	        # 省名称
         "province_confirmedCount",
          # 省确认数
         "province_suspectedCount",
          # 省疑似数
         "province_curedCount",
          # 省治愈数
         "province_deadCount"
          # 省死亡数
     )
          # 根据更新时间和省名称去重
     df2 = df1.dropDuplicates(['updateTime', 'provinceName'])
          # 根据时间升序排序
     df3 = df2.sort(df2["updateTime"].asc())
          # 由RDD创建临时表
     df3.createOrReplaceTempView("cntotal")
          # 计算疑似病例
     suspect_df = sc.sql("select updateTime as ds1,"
                         "sum(province_suspectedCount) "
                         "as suspect from cntotal group by updateTime")
          # 计算每个省份对于上次时间的新增数据差
     raw_df = sc.sql("select t1.updateTime as ds,"
                     "t1.province_confirmedCount as confirm,"
                     "t1.province_suspectedCount as suspect,"
                     "t1.province_curedCount as heal, "
                     "t1.province_deadCount as dead, "
                     "t1.province_confirmedCount-t2.province_confirmedCount as confirm_add, "
                     "t1.province_suspectedCount-t2.province_suspectedCount as suspect_add, "
                     "t1.province_curedCount-t2.province_curedCount as heal_add, "
                     "t1.province_deadCount-t2.province_deadCount as dead_add"
                     " from cntotal t1,"
                     "cntotal t2 where t1.updateTime = date_add(t2.updateTime,1) and t1.provinceName = t2.provinceName")
     raw_df.createOrReplaceTempView("cntotal")
       # 创建临时表
          # 计算全国每日新增数据
     diff = sc.sql("select ds, sum(confirm_add) as confirm_add, "                   "sum(suspect_add) as suspect_add, "                   "sum(heal_add) as heal_add, "                   "sum(dead_add) as dead_add from cntotal group by ds")     diff.createOrReplaceTempView("cntotal")         # 创建临时表          # 计算全国逐日累计数据     cum = sc.sql("select ds as ds1, "                  "sum(confirm_add) over(order by ds asc)as confirm , "                  "sum(heal_add) over(order by ds asc) as heal, "                  "sum(dead_add) over(order by ds asc) as dead "                  "from cntotal")          # 将DataFrame进行连接操作,使得三个DataFrame合成一个     history = diff.join(suspect_df, suspect_df["ds1"] == diff["ds"], "inner")     history = history.join(cum, cum["ds1"] == history["ds"], "inner")     history = history.drop("ds1").sort(history["ds"].asc())          # 将DataFrame写入mysql     history.write.mode("overwrite"). \         format("jdbc"). \         option("url", "jdbc:mysql://Master:3306/covid?useSSL=false&useUnicode=true"). \         option("dbtable", "history"). \         option("user", "root"). \         option("password", "HUSTeic2021"). \         save()

生成的数据库格式和DataFrame的格式一致