Spark学习笔记(6)—— 网站访问次数统计,spark学习笔记
分享于 点击 37516 次 点评:237
Spark学习笔记(6)—— 网站访问次数统计,spark学习笔记
1 数据文件
20160321101954 http://java.itcast.cn/java/course/javaeeadvanced.shtml
20160321101954 http://java.itcast.cn/java/course/javaee.shtml
20160321101954 http://java.itcast.cn/java/course/android.shtml
20160321101954 http://java.itcast.cn/java/video.shtml
20160321101954 http://java.itcast.cn/java/teacher.shtml
20160321101954 http://java.itcast.cn/java/course/android.shtml
20160321101954 http://php.itcast.cn/php/teacher.shtml
20160321101954 http://net.itcast.cn/net/teacher.shtml
20160321101954 http://java.itcast.cn/java/course/hadoop.shtml
20160321101954 http://java.itcast.cn/java/course/base.shtml
20160321101954 http://net.itcast.cn/net/course.shtml
20160321101954 http://php.itcast.cn/php/teacher.shtml
20160321101954 http://net.itcast.cn/net/video.shtml
20160321101954 http://java.itcast.cn/java/course/base.shtml
20160321101954 http://net.itcast.cn/net/teacher.shtml
20160321101954 http://java.itcast.cn/java/video.shtml
20160321101954 http://java.itcast.cn/java/video.shtml
20160321101954 http://net.itcast.cn/net/video.shtml
20160321101954 http://net.itcast.cn/net/course.shtml
20160321101954 http://java.itcast.cn/java/course/javaee.shtml
20160321101954 http://java.itcast.cn/java/course/android.shtml
..........
2 源码
2.1 测试1
package webcount
import org.apache.spark.{SparkConf, SparkContext}
object UrlCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("URLCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1 将数据切分,然后元组中(url,1)
val rdd1 = sc.textFile("d://urlcount.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_+_)
println(rdd2.collect().toBuffer)
sc.stop()
}
}
ArrayBuffer((http://php.itcast.cn/php/course.shtml,459), (http://java.itcast.cn/java/course/base.shtml,543), (http://java.itcast.cn/java/video.shtml,496), (http://java.itcast.cn/java/course/android.shtml,501), (http://net.itcast.cn/net/video.shtml,521), (http://java.itcast.cn/java/course/hadoop.shtml,506), (http://net.itcast.cn/net/course.shtml,521), (http://java.itcast.cn/java/course/cloud.shtml,1028), (http://php.itcast.cn/php/video.shtml,490), (http://java.itcast.cn/java/teacher.shtml,482), (http://php.itcast.cn/php/teacher.shtml,464), (http://net.itcast.cn/net/teacher.shtml,512), (http://java.itcast.cn/java/course/javaee.shtml,1000), (http://java.itcast.cn/java/course/javaeeadvanced.shtml,477))
2.2 测试2
package webcount
import java.net.URL
import org.apache.spark.{SparkConf, SparkContext}
object UrlCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("URLCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1 将数据切分,然后元组中(url,1)
val rdd1 = sc.textFile("d://urlcount.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_ + _)
val rdd3 = rdd2.map(t => {
val url = t._1
val host = new URL(url).getHost
(host, url, t._2)
})
val rdd4 = rdd3.groupBy(_._1)
println(rdd4.collect().toBuffer)
sc.stop()
}
}
ArrayBuffer((net.itcast.cn,CompactBuffer((net.itcast.cn,http://net.itcast.cn/net/video.shtml,521), (net.itcast.cn,http://net.itcast.cn/net/course.shtml,521),
(net.itcast.cn,http://net.itcast.cn/net/teacher.shtml,512))), (java.itcast.cn,CompactBuffer((java.itcast.cn,http://java.itcast.cn/java/course/base.shtml,543),
(java.itcast.cn,http://java.itcast.cn/java/video.shtml,496), (java.itcast.cn,http://java.itcast.cn/java/course/android.shtml,501),
(java.itcast.cn,http://java.itcast.cn/java/course/hadoop.shtml,506), (java.itcast.cn,http://java.itcast.cn/java/course/cloud.shtml,1028),
(java.itcast.cn,http://java.itcast.cn/java/teacher.shtml,482), (java.itcast.cn,http://java.itcast.cn/java/course/javaee.shtml,1000),
(java.itcast.cn,http://java.itcast.cn/java/course/javaeeadvanced.shtml,477))), (php.itcast.cn,CompactBuffer((php.itcast.cn,http://php.itcast.cn/php/course.shtml,459),
(php.itcast.cn,http://php.itcast.cn/php/video.shtml,490), (php.itcast.cn,http://php.itcast.cn/php/teacher.shtml,464))))
2.3 测试3
package webcount
import java.net.URL
import org.apache.spark.{SparkConf, SparkContext}
object UrlCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("URLCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1 将数据切分,然后元组中(url,1)
val rdd1 = sc.textFile("d://urlcount.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_ + _)
val rdd3 = rdd2.map(t => {
val url = t._1
val host = new URL(url).getHost
(host, url, t._2)
})
val rdd4 = rdd3.groupBy(_._1).mapValues(it =>{
it.toList.sortBy(_._3).reverse.take(3)
})
println(rdd4.collect().toBuffer)
sc.stop()
}
}
ArrayBuffer((net.itcast.cn,List((net.itcast.cn,http://net.itcast.cn/net/course.shtml,521), (net.itcast.cn,http://net.itcast.cn/net/video.shtml,521),
(net.itcast.cn,http://net.itcast.cn/net/teacher.shtml,512))), (java.itcast.cn,List((java.itcast.cn,http://java.itcast.cn/java/course/cloud.shtml,1028),
(java.itcast.cn,http://java.itcast.cn/java/course/javaee.shtml,1000), (java.itcast.cn,http://java.itcast.cn/java/course/base.shtml,543))),
(php.itcast.cn,List((php.itcast.cn,http://php.itcast.cn/php/video.shtml,490), (php.itcast.cn,http://php.itcast.cn/php/teacher.shtml,464),
(php.itcast.cn,http://php.itcast.cn/php/course.shtml,459))))
3 改进版
上个版本采用list 存数据,如果数据太多就会奔溃,改用RDD,当数据量大时,它会存到磁盘上。
3.1 测试1
package webcount
import java.net.URL
import org.apache.spark.{SparkConf, SparkContext}
object AdvUrlCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AdvURLCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1 将数据切分,然后元组中(url,1)
val rdd1 = sc.textFile("d://urlcount.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_ + _)
val rdd3 = rdd2.map(t => {
val url = t._1
val host = new URL(url).getHost
(host, url, t._2)
})
println(rdd3.collect().toBuffer)
// val rddjava = rdd3.filter(_._1 == "java.itcast.cn")
sc.stop()
}
}
ArrayBuffer((php.itcast.cn,http://php.itcast.cn/php/course.shtml,459), (java.itcast.cn,http://java.itcast.cn/java/course/base.shtml,543),
(java.itcast.cn,http://java.itcast.cn/java/video.shtml,496), (java.itcast.cn,http://java.itcast.cn/java/course/android.shtml,501),
(net.itcast.cn,http://net.itcast.cn/net/video.shtml,521), (java.itcast.cn,http://java.itcast.cn/java/course/hadoop.shtml,506),
(net.itcast.cn,http://net.itcast.cn/net/course.shtml,521), (java.itcast.cn,http://java.itcast.cn/java/course/cloud.shtml,1028),
(php.itcast.cn,http://php.itcast.cn/php/video.shtml,490), (java.itcast.cn,http://java.itcast.cn/java/teacher.shtml,482),
(php.itcast.cn,http://php.itcast.cn/php/teacher.shtml,464), (net.itcast.cn,http://net.itcast.cn/net/teacher.shtml,512),
(java.itcast.cn,http://java.itcast.cn/java/course/javaee.shtml,1000), (java.itcast.cn,http://java.itcast.cn/java/course/javaeeadvanced.shtml,477))
3.2 测试2
....
val rddjava = rdd3.filter(_._1 == "java.itcast.cn")
println(rddjava.collect().toBuffer)
ArrayBuffer((java.itcast.cn,http://java.itcast.cn/java/course/base.shtml,543), (java.itcast.cn,http://java.itcast.cn/java/video.shtml,496),
(java.itcast.cn,http://java.itcast.cn/java/course/android.shtml,501), (java.itcast.cn,http://java.itcast.cn/java/course/hadoop.shtml,506),
(java.itcast.cn,http://java.itcast.cn/java/course/cloud.shtml,1028), (java.itcast.cn,http://java.itcast.cn/java/teacher.shtml,482),
(java.itcast.cn,http://java.itcast.cn/java/course/javaee.shtml,1000), (java.itcast.cn,http://java.itcast.cn/java/course/javaeeadvanced.shtml,477))
3.3 测试3
val rddjava = rdd3.filter(_._1 == "java.itcast.cn")
val sortedjava = rddjava.sortBy(_._3, false).take(3)
println(sortedjava.toBuffer)
ArrayBuffer((java.itcast.cn,http://java.itcast.cn/java/course/cloud.shtml,1028),
(java.itcast.cn,http://java.itcast.cn/java/course/javaee.shtml,1000),
(java.itcast.cn,http://java.itcast.cn/java/course/base.shtml,543))
3.4 测试4
package webcount
import java.net.URL
import org.apache.spark.{SparkConf, SparkContext}
object AdvUrlCount {
def main(args: Array[String]): Unit = {
//从数据库中加载规则
val arr = Array("java.itcast.cn", "php.itcast.cn", "net.itcast.cn")
val conf = new SparkConf().setAppName("AdvURLCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1 将数据切分,然后元组中(url,1)
val rdd1 = sc.textFile("d://urlcount.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_ + _)
val rdd3 = rdd2.map(t => {
val url = t._1
val host = new URL(url).getHost
(host, url, t._2)
})
//println(rdd3.collect().toBuffer)
for (institute <- arr) {
val rdd = rdd3.filter(_._1 == institute)
val result = rdd.sortBy(_._3, false).take(3)
/*
* 通过JDBC 向数据库存储数据
* id:学院,URL,次数,访问日期
* */
println(result.toBuffer)
}
sc.stop()
}
}
ArrayBuffer((java.itcast.cn,http://java.itcast.cn/java/course/cloud.shtml,1028),
(java.itcast.cn,http://java.itcast.cn/java/course/javaee.shtml,1000),
(java.itcast.cn,http://java.itcast.cn/java/course/base.shtml,543))
ArrayBuffer((php.itcast.cn,http://php.itcast.cn/php/video.shtml,490),
(php.itcast.cn,http://php.itcast.cn/php/teacher.shtml,464),
(php.itcast.cn,http://php.itcast.cn/php/course.shtml,459))
ArrayBuffer((net.itcast.cn,http://net.itcast.cn/net/video.shtml,521),
(net.itcast.cn,http://net.itcast.cn/net/course.shtml,521),
(net.itcast.cn,http://net.itcast.cn/net/teacher.shtml,512))
相关文章
- 暂无相关文章
用户点评