apache spark - Add a column from another DataFrame -


in scala spark, can add column existing dataframe writing

val newdf = df.withcolumn("date_min", anotherdf("date_min"))

doing in pyspark results in analysisexception.

here i'm doing :

mindf.show(5) maxdf.show(5) +--------------------+ |            date_min| +--------------------+ |2016-11-01 10:50:...| |2016-11-01 11:46:...| |2016-11-01 19:23:...| |2016-11-01 17:01:...| |2016-11-01 09:00:...| +--------------------+ showing top 5 rows  +--------------------+ |            date_max| +--------------------+ |2016-11-01 10:50:...| |2016-11-01 11:46:...| |2016-11-01 19:23:...| |2016-11-01 17:01:...| |2016-11-01 09:00:...| +--------------------+ showing top 5 rows 

and then, results in error :

newdf = mindf.withcolumn("date_max", maxdf["date_max"])  analysisexceptiontraceback (most recent call last) <ipython-input-13-7e19c841fa51> in <module>()       2 maxdf.show(5)       3  ----> 4 newdf = mindf.withcolumn("date_max", maxdf["date_max"])  /opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in withcolumn(self, colname, col)    1491         """    1492         assert isinstance(col, column), "col should column" -> 1493         return dataframe(self._jdf.withcolumn(colname, col._jc), self.sql_ctx)    1494     1495     @ignore_unicode_prefix  /opt/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)    1131         answer = self.gateway_client.send_command(command)    1132         return_value = get_return_value( -> 1133             answer, self.gateway_client, self.target_id, self.name)    1134     1135         temp_arg in temp_args:  /opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)      67                                              e.java_exception.getstacktrace()))      68             if s.startswith('org.apache.spark.sql.analysisexception: '): ---> 69                 raise analysisexception(s.split(': ', 1)[1], stacktrace)      70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):      71                 raise analysisexception(s.split(': ', 1)[1], stacktrace)  analysisexception: u'resolved attribute(s) date_max#67 missing date_min#66 in operator !project [date_min#66, date_max#67 date_max#106];;\n!project [date_min#66, date_max#67 date_max#106]\n+- project [date_min#66]\n   +- project [cast((cast(date_min#6l double) / cast(1000 double)) timestamp) date_min#66, cast((cast(date_max#7l double) / cast(1000 double)) timestamp) date_max#67]\n      +- subqueryalias df, `df`\n         +- logicalrdd [idvisiteur#5, date_min#6l, date_max#7l, sales_sum#8, sales_count#9l]\n' 

the short answer not supported spark dataframe api, @ least not in spark 2.x. however, can write helper function achieve similar.

first let's create test data:

mindf = sc.parallelize(['2016-11-01','2016-11-02','2016-11-03']).map(lambda x: (x, )).todf(['date_min']) maxdf = sc.parallelize(['2016-12-01','2016-12-02','2016-12-03']).map(lambda x: (x, )).todf(['date_max']) 

you can use zip combine 2 data frames provided dataframes partitioned identically:

from pyspark.sql.types import structtype  def zip_df(l, r):     return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).todf(structtype([l.schema[0],r.schema[0]]))  combined = zip_df(mindf, maxdf.select('date_max')) combined.show() 

Comments

Popular posts from this blog

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

python Tkinter Capturing keyboard events save as one single string -

sql server - Why does Linq-to-SQL add unnecessary COUNT()? -