博客
关于我
Spark Streaming流式数据处理
阅读量:621 次
发布时间:2019-03-13

本文共 3423 字,大约阅读时间需要 11 分钟。

Spark Streaming 简介与实践指南

Spark Streaming 是 Apache Spark lifespan 项目的核心扩展功能,专注于实现实时数据流的可伸缩、高吞吐量和容错流处理。该功能允许数据源从 Kafka、Kinesis、TCP sockets 等多个来源接收,并通过高级操作(如 map、reduce、join、window)对数据流进行复杂算法处理,最后将结果推送至文件系统、数据库或实时仪表板。此外,Spark Streaming 还支持在数据流上部署 MLlib 机器学习和 GraphX 图形处理算法。


简单示例:创建一个基本的 WordCount 应用

以下是一个使用 Spark Streaming 创建简单但功能齐全的 WordCount 应用的示例:

# 创建 Maven 项目
org.apache.spark
spark-core_2.11
${spark_version}
org.apache.spark
spark-sql_2.11
${spark_version}
org.apache.spark
spark-streaming_2.11
${spark_version}
import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}object NcWordCount {    def main(args: Array[String]): Unit = {        // 创建 SparkConf 对象        val conf: SparkConf = new SparkConf()            .setAppName(this.getClass.getName)            .setMaster("local[4)")        // 创建 StreamingContext 对象        val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))        // 创建接收数据流的输入流(这里使用 minh局域网中的虚拟机 IP 和端口)        val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.237.160", 1234)        // 对接收到的数据流进行处理(每批处理间隔为 5 秒)        line.flatMap(_.split(" "))            .map((_, 1))            .reduceByKey(_ + _)            .print()        // 启动流处理程序        ssc.start()        // 等待程序完成        ssc.awaitTermination()    }}

Spark Streaming 核心类解析

在使用 Spark Streaming 之前,需要了解以下核心组件及其作用:

1. StreamingContext


创建方式

可以通过以下方式创建 StreamingContext

import org.apache.spark._; // 包含所有 Spark APIimport org.apache.spark.streaming._; // 包含 Streaming 相关 API// 第一种方式:通过 SparkConf 创建val conf = new SparkConf()    .setAppName(" sparkStreamingApp ")    .setMaster("local[4]") // 推лара,指定集群地址或本地运行val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))// 第二种方式:通过已有的 SparkContext 创建val sc: SparkContext = ... // (获取现有的 SparkContext)val ssc = new StreamingContext(sc, Seconds(5))

主要功能

  • 作为所有 Spark Streaming 功能的入口点。
  • 需要通过 start() 启动流处理程序,awaitTermination() 等待程序结束。
  • 提供多种创建输入流的方式,如文件系统、Socket、Kafka、Flume 等。

  • 2. Discretized Stream (DStream)


    DStream 是 Spark Streaming 的核心抽象概念。

  • 概念描述:DStream 用于表示和处理连续数据流。
  • 创建方式
    • 由输入源(如 Kafka、Flume、Socket 等)生成的输入流 DStream。
    • 通过对现有 DStream 应用的高级操作生成新的 DStream。
  • 内部实现:DStream 在底层由一系列 RDD(Resilient Distributed Datasets)表示,每个 RDD 对应特定的时间窗口内的数据。
  • 操作示例
    • flatMap:将输入流中的记录(行)映射为更小的数据单元(如单词)。
    • reduceByKey:对相同键的记录进行聚合。
    • join:将数据流中的记录进行关联操作。

  • 3. 输入流(Input DStreams)


    输入流是 Spark Streaming 中的关键组件,负责从外部数据源接收数据并在 Spark 集群中建立 DStream。

    1. 基础数据源

    • Socket:通过 TCP sockets 接收数据流(本地或远程)。
    • File:通过文件系统(HDFS、S3 等)递送更新文件中的数据流。
    • RDD:通过队列(RDD queue)测试 Spark Streaming 程序。

    2. 高级数据源

    • Flume:通过 Flume 传输数据流。
    • Kafka:通过 Kafka 消息队列接收数据流。
    • Kinesis:通过 AWS Kinesis 接收数据流。

    3. 自定义接收器

    可以通过自定义接收器接收数据流。例如,使用Scala的 Receiver 类或 Java 的 Receiver 接口实现自定义接收逻辑。


    Spark Streaming 优化指南


    1. 设置合适的批处理间隔

    • 优点:增大批处理时间可以减少存储压力(如 HDFS "'.$"" 的写入压力)。
    • 警告:过大的批处理间隔可能导致数据丢失或延迟增加。
    1. 输入流配置

      • 确保接收器线程数量与集群里的 core 数量相当。
      • 本地运行时,建议使用 local[n] 格式的主 URL。
    2. 性能调优

      • 合理设置参数 sparkLocalRSSplitSizeBytes 和 `spark ($("#blockManagerId #blockId = BlockManagerId disk: kiểu Fi幕","#"prs:"
        ")
      • 定期检查与清理旧的 RDD 和使用较新的 Spark 版本(建议使用最新稳定版本)。

      总结

      Spark Streaming 是 Spark 技术的重要扩展功能,能够高效处理实时数据流。通过合理配置输入源、优化数据处理逻辑以及监控性能指标,可以实现高吞吐量、容错性和可扩展性的数据流处理任务。希望本文的内容能够为您的 Spark Streaming 开发提供帮助!

    转载地址:http://zbxaz.baihongyu.com/

    你可能感兴趣的文章
    npm安装 出现 npm ERR! code ETIMEDOUT npm ERR! syscall connect npm ERR! errno ETIMEDOUT npm ERR! 解决方法
    查看>>
    npm安装crypto-js 如何安装crypto-js, python爬虫安装加解密插件 找不到模块crypto-js python报错解决丢失crypto-js模块
    查看>>
    npm安装教程
    查看>>
    npm报错Cannot find module ‘webpack‘ Require stack
    查看>>
    npm报错Failed at the node-sass@4.14.1 postinstall script
    查看>>
    npm报错fatal: Could not read from remote repository
    查看>>
    npm报错File to import not found or unreadable: @/assets/styles/global.scss.
    查看>>
    npm报错unable to access ‘https://github.com/sohee-lee7/Squire.git/‘
    查看>>
    npm版本过高问题
    查看>>
    npm的“--force“和“--legacy-peer-deps“参数
    查看>>
    npm的安装和更新---npm工作笔记002
    查看>>
    npm的常用配置项---npm工作笔记004
    查看>>
    npm的问题:config global `--global`, `--local` are deprecated. Use `--location=global` instead 的解决办法
    查看>>
    npm编译报错You may need an additional loader to handle the result of these loaders
    查看>>
    npm设置淘宝镜像、升级等
    查看>>
    npm设置源地址,npm官方地址
    查看>>
    npm配置安装最新淘宝镜像,旧镜像会errror
    查看>>
    NPM酷库052:sax,按流解析XML
    查看>>
    npm错误 gyp错误 vs版本不对 msvs_version不兼容
    查看>>
    npm错误Error: Cannot find module ‘postcss-loader‘
    查看>>