用idea工具scala 和 Java开发 spark案例:WordCount

news/2024/7/16 8:22:13 标签: intellij-idea, java, ide, spark, scala

目录

一 环境准备

scala%E4%BB%A3%E7%A0%81%E7%BC%96%E5%86%99-toc" style="margin-left:0px;">二 scala代码编写

java%20%E4%BB%A3%E7%A0%81%E7%BC%96%E5%86%99-toc" style="margin-left:0px;">三 java 代码编写


一 环境准备

        创建一个 maven 工程

        添加下列依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-graphx_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>${mysql.version}</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.62</version>
    </dependency>

        原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改

        

scala%E4%BB%A3%E7%A0%81%E7%BC%96%E5%86%99">二 scala代码编写

        首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址

         新建一个 scala 的 object,编写代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object WordCountDemo {
  def main(args: Array[String]): Unit = {
    val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
    val sc : SparkContext = SparkContext.getOrCreate(conf)

    var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()

//    val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
//    val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
//    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
//    val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)

    val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    //打印到 console
    //    result2.glom().collect.foreach(x=>println(x.toList))
    //保存到 hdfs
    result2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")
  }

}

        这里稍微解释一下代码中的一些函数:

        map:转换函数,数据集合中每个元素进行一次我们定义的方法

        flatMap: 与map类似,但是映射为0个或多个

        collect:以数组的形式返回数据集中的所有元素 

        glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

 

        云服务器的朋友可能有的报错

22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]

        出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到

        解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip

        在 idea 中,resource 文件夹中添加文件 hdfs-site.xml

        hdfs-site.xml内容:

<!-- datanode 通信是否使用域名,默认为false,改为true -->
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
        <description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.
        </description>
    </property>

java%20%E4%BB%A3%E7%A0%81%E7%BC%96%E5%86%99">三 java 代码编写

        这里原数据存储在本地,文件名为 input.txt

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Map;

public class WordCount {
    public static void main(String[] args) {
        // 创建SparkConf对象
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");

        // 创建JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 读取文本文件
        JavaRDD<String> lines = sc.textFile("input.txt");

        // 计算单词出现次数
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());
        JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((x, y) -> x + y);
        Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();

        // 输出结果
        for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {
            System.out.println(entry.getKey() + ": " + entry.getValue());
        }

        // 关闭JavaSparkContext对象
        sc.close();

    }
}


http://www.niftyadmin.cn/n/5075814.html

相关文章

Apache Doris (三十七):Doris数据导出 - Export导出

进入正文前,感谢宝子们订阅专题、点赞、评论、收藏!关注IT贫道,获取高质量博客内容! 🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊。 🔔 博主个人B栈地址:https://space.bilibili.com/4…

如何选择编程语言Python Go还是Rust?

选择编程语言需要考虑多个方面&#xff0c;包括语言的特性、社区支持、工作机会、学习曲线等。下面是关于Python Go和Rust的一些介绍。 1.基本语法 1. Python: Python 是一种脚本语言&#xff0c;以简洁、易读的语法著称。以下是 Python 的基本语法示例&#xff1a; # Hello…

Atomic原子类详解

为什么需要Atomic原子操作类&#xff1f; 在并发环境中&#xff0c;代码如果操作相同的数据&#xff0c;就会产生资源竞争&#xff0c;导致结果远小于预期值 例如在A线程B线程中同时获取到变量数据为1&#xff0c;同时执行变量1操作&#xff0c;结果可能也是1&#xff0c;存在脏…

github小记(一):清除github在add或者commit之后缓存区

github清除在add或者commit之后缓存区 前言1. 第一步之后想要撤销2. 第二步之后想要撤销a. 改变一下rrr.txt的内容b. 想提交本地文件的test文件夹c. 我后悔了突然不想提交了 前言 github自用 一般github上代码提交顺序&#xff1a; 第一步&#xff1a; git add . or git ad…

慢 SQL 的致胜法宝

大促备战&#xff0c;最大的隐患项之一就是慢SQL&#xff0c;对于服务平稳运行带来的破坏性最大&#xff0c;也是日常工作中经常带来整个应用抖动的最大隐患&#xff0c;在日常开发中如何避免出现慢SQL&#xff0c;出现了慢SQL应该按照什么思路去解决是我们必须要知道的。本文主…

基于亚马逊云科技Amazon EC2云服务器的G4实例可提供极具成本效益的GPU并支持实时光追技术

随着Android应用程序和游戏变得越来越丰富&#xff0c;其中有些甚至比PC上的软件更易于使用和娱乐&#xff0c;因此许多人希望能够在云上运行Android游戏或应用程序&#xff0c;而在Amazon EC2实例上运行Android的解决方案可以让开发人员更轻松地测试和运行Android应用程序。在…

WebDAV之π-Disk派盘 + 咕咚云图

咕咚云图是一款强大的图床传图软件,它能够让您高效地对手机中的各种图片进行github传输,多个平台快速编码上传,支持远程删除不需要的图片,传输过程安全稳定,让您可以很好的进行玩机或者其他操作。 可帮你上传手机图片到图床上,并生成 markdown 链接,支持七牛云、阿里云…

android多地图源切换

1、参考文章 https://github.com/osmdroid/osmdroid 安卓使用osmdroid显示谷歌地图、高德地图及离线地图详解_高德 android离线 sdk-CSDN博客