我理解Spark Dataframe和Pandas DataFrame都是包含行和列的数据表, 他们有什么不同吗?
举报 使用道具
| 回复

共 5 个关于本帖的回复 最后回复于 2021-9-1 17:03

沙发
内容分析应用 金牌会员 发表于 2021-8-29 09:03:51 | 只看该作者
他们之间的根本区别在于:
Python Dataframe或者excel表,通常是位于同一台电脑上的。

而Spark Dataframe是分布式大数据集合,可能跨越数千台电脑。
将数据存放在多台电脑上的原因是:因为数据量太大而不适合在一台电脑上存储,或者是因为在一台电脑上执行计算所需的时间太长
举报 使用道具
板凳
Fuller 管理员 发表于 2021-8-29 09:55:58 | 只看该作者
Spark的DataFrame和pySpark我没有实际编程实验过,但是,根据看资料,我觉得有这些类比:

除了Pandas是单机“小数据”应用场景对比Spark“大数据”多机集群应用场景外,还有:

1,我猜测Spark为了解决多机问题,要做一些像分布式数据库的数据库日志调度工作,对数据的操作是抽象成操作日志的,怎样分发,什么时候才将日志中的操作真正实施到物理数据上。应该还有一些数据缓存和提速功能,有点像单机的Hibernate中间件,估计有一套数据cache管理

2,Spark的RDD是immutable的,这个给编程模式带来很大的影响,比如,操作immutable的单向链表和树结构的时候,会深切体会到数据的拷贝和数据结构的副本,也会对函数式编程和递归留下深刻印象。从这方面我产生几个问题:首先用Python操作Spark会不会容易出错?如果用Scala的话,从语言层面做了很多确保,强制程序员要改变编程模式。如果数据结构设计不合理,immutable将大大消耗内存,就会带来大量的垃圾处理开销、数据拷贝开销。所以,我在想,如果思想中把Spark DataFrame当成Pandas DataFrame那样用,是不是不行?难道Spark DataFrame能自动应对?

另外,我有个想法:如果要做复杂的数据清洗,表合并和拆分,是不是应该先在Pandas DataFrame中装配好了以后,一次性转换成Spark DataFrame?
举报 使用道具
地板
Fuller 管理员 发表于 2021-8-29 10:00:14 | 只看该作者
应为是immutable的,数据结构不应该含有双向引用,比如,不能有双向链表,只能有单向链表,否则,一个节点的任何一个改动将导致整个数据结构中的数据都重建一遍,Spark DataFrame没有索引是不是就是因为这个?
举报 使用道具
5#
Fuller 管理员 发表于 2021-9-1 17:00:31 | 只看该作者
我查到这样一篇文章《Pyspark-dataframe使用22字诀》,我看到有这么一段代码:
  1.     from pyspark.sql import functions as fn
  2.     1. 查找: 根据需要选择列按条件进行过滤
  3.     test = spark.createDataFrame([['a', 0,2], ['b', 0, 0], ['c', 4, 4],
  4.                                  ['c', 0,2], ['c', 1,0]], schema=['a', 'b', 'c'])
  5.     test.show()
  6.     # 按条件进行过滤可以选择filter也可以选择where
  7.     test.select('a').filter(test['a'] != 2).show()
  8.     test.filter((test['a'] != 0) | (test['c'] != 0)).show()

  9.     # 多次filter条件,按区间选择值,对列按照sql语句进行选择
  10.     test.filter(test.b.betwen(1,4)).filter("a like 'a%'").show()

  11.     # 这里需要注意的事,如果通过withcolumn创建列了新的列需要通过function.col函数来指定列
  12.     test.withColumn('aa',fn.size(test['a'])).filter((fn.col('aa') > 1))

  13.     # 也可以通过选择条件赋值给新的列在进行过滤
  14.     test.withColumn("aa", when((test['a'] == 0)
  15.                 && (test['b'] == 0, 0).otherwise(1)).filter(fn.col('aa') == 1)

  16.     # 或者可以创建一个临时表,在临时表上使用sql
  17.     test.createOrReplaceTempView("test_df")
  18.     spark.sql("select distinct(a) from test_df").show()

  19.    
  20.     2. 修改/增加新列:withcolumn, 比如对string通过正则表达式进行过滤
  21.     test = test.withcolumn('aa', fn.regexp_replace('query', "[^\u4e00-\u9fa5\u0030-\u0039\u0041-\u005a\u0061-\u007a]", ""))

  22.     3. 删除一列: drop
  23.     test = test.drop('a')
  24.     # 也可以直接选择自己需要的一些列
  25.     test = test.select(['a', 'b'])
复制代码
修改操作,包括修改表结构,看来都不是in-place的,而是修改以后返回一个新的数据集,应该是为了确保数据都是immutable的
举报 使用道具
6#
Fuller 管理员 发表于 2021-9-1 17:03:52 | 只看该作者
还是来自上一帖那篇文章,rdd和dataframe是两种操作方式,rdd操作也是immutable的,而且都用函数式编程,传入一个匿名函数,对数据进行处理后,生成一个新的rdd。
  1. 1. 从集合创建(序列化)
  2. rdd = sc.parallelize([1,2,3,4], 1)
  3. rdd.collect() # 其中collect方法就是将rdd转为list可以理解为反序列化

  4. 2. 或者自行创建
  5. aa_list = [[1,2], [3,4]]
  6. df = spark.createDataFrame(query_core_list, schema=['col1', 'col2'])

  7. 3. 从hdfs上面读取,这个时候需要注意的是,这个对象应该是sparkcontext,比如下所示
  8. sc_conf = SparkConf().setAppName('hello')
  9. spark = SparkSession.builder.config(conf=sc_conf).enableHiveSupport().getOrCreate()
  10. sc = spark.sparkContext # 需要用sc去读取hdfs而不是sql
  11. core_rdd = sc.textFile('afs://hdfs地址:端口号/路径/*/*')
  12. news = core_rdd.map(lambda line: line.split('\t')).map(lambda p: Row(query = p[0], core_query = p[1]))
  13. # 将rdd变成dataframe格式数据
  14. df_core = spark.createDataFrame(news)

  15. 4. 从hive表中读取数据
  16. sc_conf = SparkConf().setAppName('test')
  17. spark = SparkSession.builder.config(conf=sc_conf).enableHiveSupport().getOrCreate()
  18. sc = spark.sparkContext
  19. sql_query = """select a.id,  a.query from 表地址.表名 a \
  20.             where a.event_day = {} and
  21.             (a.a = 'a' or
  22.             (a.b = 'b' and c in ('s','s/index','shop/index','shopindex','c','c/index'))) and
  23.             a.d != 'd' and
  24.             a.e not like '%有限公司%' and
  25.             a.f not like '%http%' and
  26.             a.h != "x" and
  27.             a.g != "x" and
  28.             length(a.a) < 25
  29.             group by a.d, a.f
  30.             """.format(day)
  31. df_res = spark.sql(sql_query)
复制代码


举报 使用道具
您需要登录后才可以回帖 登录 | 立即注册

精彩推荐

  • Gephi社会网络分析-马蜂窝游记文本分词并同
  • Gephi社会网络分析-基于马蜂窝游记文本以词
  • 知乎话题文本根据词语间距筛选后生成共词矩
  • 马蜂窝游记文本分词后以词语间距为筛选条件
  • 学习使用apriori算法挖掘关联关系

热门用户

GMT+8, 2024-4-20 07:32