Intellij IDEA for Spark w/ Scala examples

3 minute read

“And why I don’t use Eclipse for Spark”

Why I don’t use Eclipse for Spark?

I tried eclipse, Atom, Sublime and even Emacs before settling on IntelliJ. The reason I finally went back tp Intellij is the same like most of other Scala developers – the more stable IDE and more features.

Since the Scala IDE team also showed interest to move to VS Code back in 2017 and started a few new projects on GitHub, there’s really no use to stick to Eclipse when it’s already not the top choice from Scala’s own team.

And as I quote from this Quora user here:

Having tried Eclipse on and off, and sticking IntelliJ for a while, its a tradeoff between being less useful but more responsive/performant (Eclipse) vs less responsive/performance but more useful (IntelliJ).

The auto-completion & refactoring features in IntelliJ work really well when we are using Java but it becomes more like an issue when it comes to Scala. The type system in Scala is very complicated so sometimes it caues more trouble than easying the burden.(e.g. the incorrect highlighting, libraries importing…)

However, compared to the less rich features the Eclipse provides, I’m more than happy to stick to IntelliJ than going back to Eclipse, especially when it already gave me so much painful memory while doing some Java web projects & I have the liscence for IntelliJ Ultimate version ; )

New Scala projects in INtelliJ

Set up DEV environment in IntelliJ for Scala

In 2019
  1. Config

I directly followed this guide from JetBrains. but its worthwhile to check this post from itversity in 2018. It has more thourough guides.

  1. Running
  • Local running: just go to “Run” - > “Run Configurations”
  • Running in Spark clusters: pack the program as Jar as use shell. Select “File” –> “Project Structure” –> “Artifact”,then select “+” –> “Jar” –> “From Modules with dependencies”, and choose main function and select the jar location in the pop up. Finally, just choose “Build” –> “Build Artifact” and compile to jar.

./bin/spark-shell --master <master-url>

If we use local mode in Spark commands and run it on 4 CPU cores, the command will simply become ./bin/spark-shell --master local[4].

And for convinence, it’s better to config system parameter:

vi /etc/profile
 
# add following to the end of the file
export PATH=$PATH:/usr/local/spark-[version]-bin-hadoop[version]/bin
 
# activate the change
source /etc/profile

Scala Code examples

Word Count

4 paramters: Spark master location, program name, Spark installation log and Jar location.

import org.apache.spark._
import SparkContext._

val sc = new SparkContext(
    args(0), "WordCount", 
    System.getenv("SPARK_HOME"),
    Seq(System.getenv("SPARK_TEST_JAR"))
)

// read in file
val textFile = sc.textFile(args(1))

// directly create a Hadoop RDD Object
var hadoopRdd = new HadoopRDD(
    sc, 
    conf,
    classOf[SequenceFileInputFormat[Text,Text,classOf[Text], classOf[Text]]
    )

// first get the word from input & put the same word in one bucket, then count the frequences.

val result = hadoopRdd.flatMap{
    case (key,value) = > value.toString().split("\\s+");
    }.map(
        word = > (word, 1)).reduceByKey (_ +_)
result.saveAsSequenceFile(args(2))

Top K

Top K task has many answers, either in algorithm way or Big data. Here in Spark, we simply follow the avove program and find the top K words.

A lot of tech blogs tend to use the top method from SparkAPI to calculate, but we can also use the algorithm way, which is heap sort to get the answer.

Here’s the common way:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
 
object TopK {
  def main(args: Array[String]) {
    if (args.length != 2) {
      System.out.println("Usage: <src> <num>")
      System.exit(1)
    }
 
    val conf = new SparkConf().setAppName("TopK")
    val sc = new SparkContext(conf)
 
    val lines = sc.textFile(args(0))
    val ones = lines.flatMap(_.split(" ")).map(word => (word, 1))
    val count = ones.reduceByKey((a, b) => a + b)
    val convert = count.map {
      case (key, value) => (value, key)
    }.sortByKey(true, 1)
    convert.top(args(1).toInt).foreach(a => System.out.println("(" + a._2 + "," + a._1 + ")"))
}

Here’s the Heap method, taken from StackOverflow.

def pickTopN[A, B](n: Int, iterable: Iterable[A], f: A => B)(implicit ord: Ordering[B]): Seq[A] = {
  val seq = iterable.toSeq
  val q = collection.mutable.PriorityQueue[A](seq.take(n):_*)(ord.on(f).reverse) // initialize with first n

  // invariant: keep the top k scanned so far
  seq.drop(n).foreach(v => {
    q += v
    q.dequeue()
  })

  q.dequeueAll.reverse
}

Leave a comment