翼度科技»论坛 编程开发 JavaScript 查看内容

基于Spark的大规模日志分析

14

主题

14

帖子

42

积分

新手上路

Rank: 1

积分
42
摘要:本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。
本文分享自华为云社区《【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】》,作者:上进小菜猪。
随着互联网的普及和应用范围的扩大,越来越多的应用场景需要对海量数据进行高效地处理和分析,这就要求我们必须具备大数据技术方面的知识和技能。本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。
1.数据来源

我们的项目是针对某购物网站的访问日志进行分析,其中主要包含以下几个字段:

  • IP:访问的客户端 IP 地址
  • Time:访问时间
  • Url:访问的 URL 地址
  • User-Agent:浏览器标识符
原始数据规模约为 100GB,我们需要对其进行清洗、统计和分析,以得到有用的信息和价值。
2. 数据清洗

由于原始数据存在缺失值、异常值、重复值等问题,因此我们需要进行数据清洗,主要包括以下步骤:

  • 将原始数据进行格式转换,方便后续处理
  • 对 IP、Time、Url 和 User-Agent 字段进行解析和提取
  • 去除不合法的记录和重复的记录
具体代码实现如下:
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import java.text.SimpleDateFormat
  3. import java.util.Locale
  4. object DataCleaning {
  5.   def main(args: Array[String]) {
  6.     val conf = new SparkConf().setAppName("DataCleaning")
  7.     val sc = new SparkContext(conf)
  8.     val data = sc.textFile("hdfs://master:9000/log/access.log")
  9. // 定义时间格式及地区信息
  10.     val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
  11. // 数据清洗
  12.     val cleanData = data.map(line => {
  13.       val arr = line.split(" ")
  14. if (arr.length >= 9) {
  15. // 解析 IP
  16.         val ip = arr(0)
  17. // 解析时间,转换为 Unix 时间戳
  18.         val time = dateFormat.parse(arr(3) + " " + arr(4)).getTime / 1000
  19. // 解析 URL
  20.         val url = urlDecode(arr(6))
  21. // 解析 UserAgent
  22.         val ua = arr(8)
  23. (ip, time, url, ua)
  24. }
  25. }).filter(x => x != null).distinct()
  26. // 结果输出
  27.     cleanData.saveAsTextFile("hdfs://master:9000/cleanData")
  28.     sc.stop()
  29. }
  30. // URL 解码
  31.   def urlDecode(url: String): String = {
  32.     java.net.URLDecoder.decode(url, "utf-8")
  33. }
  34. }
复制代码
3. 数据统计

对于大规模数据的处理,我们可以使用 Spark 提供的强大的分布式计算能力,以提高处理效率和减少计算时间。
我们这里使用 Spark SQL 统计每个 URL 的访问量,并输出前 10 个访问量最高的 URL,代码如下:
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.sql.SQLContext
  3. case class LogRecord(ip: String, time: Long, url: String, ua: String)
  4. object DataAnalysis {
  5.   def main(args: Array[String]) {
  6.     val conf = new SparkConf().setAppName("DataAnalysis")
  7.     val sc = new SparkContext(conf)
  8.     val sqlContext = new SQLContext(sc)
  9. // 读取清洗后的数据
  10.     val cleanData = sc.textFile("hdfs://master:9000/cleanData").filter(x => x != null)
  11. // 将数据转换为 DataFrame
  12. import sqlContext.implicits._
  13.     val logDF = cleanData.map(_.split(",")).map(p => LogRecord(p(0), p(1).toLong, p(2), p(3))).toDF()
  14. // 统计每个 URL 的访问量,并按访问量降序排序
  15.     val topUrls = logDF.groupBy("url").count().sort($"count".desc)
  16. // 输出前 10 个访问量最高的 URL
  17.     topUrls.take(10).foreach(println)
  18.     sc.stop()
  19. }
  20. }
复制代码
4. 数据可视化

数据可视化是将处理和分析后的数据以图表或图像的方式展示出来,有利于我们直观地观察数据的规律和趋势。
我们这里采用 Python 的 Matplotlib 库将前 10 个访问量最高的 URL 可视化,代码如下:
  1. import matplotlib.pyplot as plt
  2. # 读取数据
  3. with open('topUrls.txt', 'r') as f:
  4.     line = f.readline()
  5.     urls = []
  6.     counts = []
  7. while line and len(urls) < 10:
  8.         url, count = line.strip().split(',')
  9.         urls.append(url)
  10.         counts.append(int(count))
  11.         line = f.readline()
  12. # 绘制直方图
  13. plt.bar(range(10), counts, align='center')
  14. plt.xticks(range(10), urls, rotation=90)
  15. plt.xlabel('Url')
  16. plt.ylabel('Count')
  17. plt.title('Top 10 Url')
  18. plt.show()
复制代码
在进行数据清洗前,需要先对原始日志数据进行筛选,选取需要分析的字段。然后进行数据清洗,去掉不必要的空格、特殊字符等,使数据更加规整,并增加可读性。
下面是数据清洗的代码示例:
  1. val originalRdd = spark.sparkContext.textFile("path/to/logfile")
  2. val filteredRdd = originalRdd.filter(line => {
  3.   val tokens = line.split("\t")
  4.   tokens.length >= 10 &&
  5. tokens(0).matches("\d{4}-\d{2}-\d{2}") &&
  6. tokens(1).matches("\d{2}:\d{2}:\d{2}") &&
  7. tokens(2).matches("\d+") &&
  8. tokens(3).matches("\d+") &&
  9. tokens(4).matches("\d+") &&
  10. tokens(5).matches("\d+") &&
  11. tokens(6).matches(".+") &&
  12. tokens(7).matches(".+") &&
  13. tokens(8).matches(".+") &&
  14. tokens(9).matches(".+")
  15. })
  16. val cleanedRdd = filteredRdd.map(line => {
  17.   val tokens = line.split("\t")
  18.   val timestamp = s"${tokens(0)} ${tokens(1)}"
  19.   val request = tokens(6).replaceAll(""", "")
  20.   val responseCode = tokens(8).toInt
  21. (timestamp, request, responseCode)
  22. })
复制代码
​在上述代码中,我们首先读取原始日志数据,并使用filter函数过滤掉不符合条件的行;然后使用map函数将数据转换为元组的形式,并进行清洗。其中,元组的三个元素分别是时间戳、请求内容和响应状态码。
接下来,让我们来介绍一下如何使用Spark进行数据统计。
数据统计是大规模数据分析中非常重要的一个环节。Spark提供了丰富的聚合函数,可用于对数据进行各种统计分析。
下面是对清洗后的数据进行统计分析的代码示例:
  1. import org.apache.spark.sql.functions._
  2. val df = spark.createDataFrame(cleanedRdd).toDF("timestamp", "request", "responseCode")
  3. val totalCount = df.count()
  4. val errorsCount = df.filter(col("responseCode") >= 400).count()
  5. val successCount = totalCount - errorsCount
  6. val topEndpoints = df.groupBy("request").count().orderBy(desc("count")).limit(10)
  7. topEndpoints.show()
复制代码
在上面的代码中,我们首先将清洗后的数据转换为DataFrame,然后使用count函数计算总记录数和错误记录数,并计算成功记录数。最后使用groupBy和orderBy函数按照请求内容,对数据进行分组统计,并打印出请求次数最多的前10个端点。
通过可视化,我们可以清楚地看到前 10 个访问量最高的 URL 地址及其访问量,这对于进一步分析和优化网站的性能和用户体验具有重要的意义。
总结起来,这就是我们的一个大数据实战项目,我们使用 Spark 统计了购物网站的访问量,并通过 Python 的 Matplotlib 库将结果可视化。这个过程中,我们运用了数据清洗、Spark SQL 统计和可视化等技术,为大规模数据的处理和分析提供了有效的解决方案。
 
点击关注,第一时间了解华为云新鲜技术~

来源:https://www.cnblogs.com/huaweiyun/p/17482314.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具