基于大数据SparkSQL城市燃气输配监测数据分析方法初探

(整期优先)网络出版时间:2018-06-16
/ 3

基于大数据SparkSQL城市燃气输配监测数据分析方法初探

刘雪岭燕如庭王晓彤杨富元冯仁婕段云飞

贵州理工学院矿业工程学院安全工程教研室550003

摘要:SparkSQL2.0.X开始运用新的线程概念SparkSession,从而取代了原本的SQLContext与HiveContext,并更新了数据框架DataFrames。分布式文件系统HDFS可以为城市燃气数据提供稳定可靠的海量数据存储能力,而SparkSQL为其提供高性能的内存计算和数据分析框架。基于SparkSQL的城市燃气输配大数据分析,初步设计并实现通过分布式存储与最新的SparkSQL大数据计算技术,提升查询性能与系统可扩展性的方法。

关键词:SparkSQL;燃气输配;大数据;数据统计

1引言

随着城市现代化建设的发展和城市燃气管网规模的不断扩大,SCADA,SupervisoryControlAndDataAcquisition)系统已广为应用。可对现场的运行设备进行监视和控制,以实现数据采集、设备控制、测量、参数调节以及各类信号报警等各项功能[1,2]。然而针对燃气企业不同管理部门的职能特点,不同管理部门所需要数据也不尽相同,例如,运行管理部门主要监控压力、管线流量等方面的实时分析,确定区域管线运转是否正常,压力是否平稳;而营业管理部门则重点对通过气量的监测,实时分析上下游进销气量情况,掌握每天、甚至每个小时企业整体的供销差率,进而提升燃气企业整体工作的反应速度。因此,要实现数据的统一分析,就要对不同的系统功能、数据结构进行整合,形成统一的数据库,为进行数据分析提供基础保障,并为调度管理的远程监控提供便利。另外,传统的SCADA是基于关系数据库的设计,以满足对燃气应用终端产生的海量数据进行全面、有效分析和查询,暴露出以下三个突出问题:(1)随着海量数据的逐步增加,数据读写性能下降。(2)跨表连接性能逐步下降。(3)数据表扩展性差。

近几年,以Hadoop、Spark为代表的分布式存储与计算框架在工业界得到了广泛的应用,它们具有高写入吞吐能力、灵活的可扩展性与高可用性等特点。Hadoop中的高吞吐分布式文件系统HDFS可以为城市燃气数据提供稳定可靠的海量数据存储能力,而Spark[3]可以为其提供高性能的内存计算框架。本文设计并实现了基于SparkSQL的城市燃气输配大数据分析方法,通过分布式存储与计算技术提升查询性能与系统可扩展性。

2SparkSQL相关工作

在当前现有的Hadoop应用中,大数据首先存储在HDFS,通过Map与Reduce的功能处理各种查询处理。然而,由于大数据查询处理过程中硬盘的瓶颈效应,查询过程效率受到一定的限制。而SparkSQL通过最新的DataFrameAPI及其编程,使查询结构型数据成为可能,并大大提高了效率。Shark的出现,就已使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高。而最新的Spark重新开发了兼容性、独立性均高的数据仓库系统SparkSQL。SparkSQL2.0.X[3]版摆脱了Hive的限制,抛弃原有Shark的代码,汲取了Shark的一些优点,同时采样内存列存储(In-MemoryColumnarStorage)、字节码生成技术(bytecodegeneration,CG)技术,并对Scala代码优化。由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的提升,为分析人员提供了便利的查询工具。如图1所示,SparkSQL不但可通过JDBC/ODBC和命令行控制窗(command-lineconsole)访问,同样也可以通过建立数据框架接口DataFrameAPI进行数据查询访问,增加了外部数据的多种解决途径。

图1SparkSQL结构关系框架

3.SparkSQL数据查询方法

从SparkSQL2.0.X开始,Spark的数据查询运用了新的线程概念SparkSession,取代了原本的SQLContext与HiveContext,以及新的数据框架DataFrames理念。创建SQL查询需先建立一个线程与一个数据框架。以JAVA调用为例:

SparkSession线程启动如下:

importorg.apache.spark.sql.SparkSession;//调入并建立SQL查询的SparkSession;

SparkSessionspark=SparkSession

.builder()

.appName("gas-distribution")

.config(conf)

.getOrCreate();;

在SparkSession线程创建完成后,即可对不同类型及来源的数据进行数据框架DataFrames的创建:

importorg.apache.spark.sql.Dataset;//调入SparkSQL数据集;

importorg.apache.spark.sql.Row;//调入并建立SparkSQL数据集的行;

Dataset<Row>df=spark.read().json("data/gas-distribution-dispatching.json");//读取数据源;

df.createOrReplaceTempView("gas-distribution-dispatching");//将读取数据源注册为一个SQL临时查询表;

Dataset<Row>sqlDF=spark.sql("SELECT*FROMgas-distribution-dispatching");

sqlDF.show();//显示数据查询结果。

Dataset是SparkSQL2.0.X引入的一个新的特性。DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder当序列化数据时,Encoder产生字节码与off-heap进行交互,,能够达到按需访问数据的效果,而不用反序列化整个对象。SparkSQL语法和用法和mysql有一定的相似性,可以查看表、表结构、查询、聚合等操作。用户可以使用SparkSQL的API接口做聚合查询等操作或者用类SQL语句实现,但是必须将DataSet注册为临时查询表。

4.城市燃气输配数据SparkSQL查询与统计

4.1城市燃气输配数据集查询

对于已建立好的Hadoop分布式城市燃气输配管网压力及流量监控大数据,利用Java反射的特性,首先创建一个大数据对象的RDD,随后对表结构推断,主要代码如下:

importorg.apache.spark.api.java.JavaRDD;

importorg.apache.spark.api.java.function.Function;

importorg.apache.spark.api.java.function.MapFunction;

importorg.apache.spark.sql.Dataset;

importorg.apache.spark.sql.Row;

importorg.apache.spark.sql.Encoder;

importorg.apache.spark.sql.Encoders;

JavaRDD<Gas-Pressure&Flux>gasRDD=spark.read()

.textFile("..\\Gas-Pressure&Flux-Data\data.txt")

.javaRDD()

.map(newFunction<String,Gas-Pressure&Flux>()

publicGas-Pressure&Fluxcall(Stringline)throwsException{

String[]parts=line.split(",");

Gas-Pressure&Fluxgas=newGas-Pressure&Flux();

gas.setDateTime(parts[0]);

gas.setSensorNo(parts[1]);

gas.setPressure(Integer.parseInt(parts[2].trim()));

returngas;

Dataset<Row>gasDF=spark.createDataFrame(gasRDD,Gas-Pressure&Flux.class);

gasDF.createOrReplaceTempView("gas");

完成表结构推断后,即可通过SparkSQL运用SQL的查询语句进行数据的过滤、聚合等一系列查询任务,如

Dataset<Row>SensorNoDF=spark.sql("SELECT*FROMgasWHERESensorNo=1andPressure>=0.5");

4.2城市燃气输配数据集统计

城市燃气输配管网流量统计是营业管理部门实时分析上下游进销气量,掌握每天、甚至每个小时燃气输配供销变化的重要指导性数据。以spark的窗口统计功能为例,对燃气输配管网流量某月各星期的均值流量做统计。代码如下:

Dataset<Row>Gas-Flux=gasDF.filter("year(Date)==2016").

filter("month(Date)==6");

Dataset<Row>windowWithStartTime=Gas-Flux.

groupBy(window(Gas-Flux.col("DateTime"),"1week","1week","136hour")).

agg(avg("Flux").as("weekly_average"));

windowWithStartTime.sort("window.start").

select("window.start","window.end","weekly_average").

show();

5.结论

Spark对SparkSQL项目的改进与升级使之在SQL-on-Hadoop的性能有了质的飞跃,可以预见,SparkSQL将成为未来大数据时代结构性数据化处理的重要利器。本文基于SparkSQL城市燃气输配大数据分析方法,实现了高效的相关数据查询。实践表明,这些技术极大地提升了SparkSQL在处理城市燃气输配大数据查询的性能。下一步工作会关注SparkSQL与城市燃气输配大数据相关查询优化与算法问题方法。

参考文献:

[1]高猛潘元祯.城市燃气数据采集与监控(SCADA)系统应用介绍[J].科技信息,2010.12,470-471

[2]刘兴华,佟健鹏.燃气远程数据采集与实时监控分析的研究[J].天津科技,2014,41(12),23-24.

[3]SparkSQL,DataFramesandDatasetsGuide-Spark2.0.1Documentation.http://spark.apache.org.