How can I efficiently join a large rdd to a very large rdd in spark? -
i have 2 rdds. 1 rdd between 5-10 million entries , other rdd between 500 million - 750 million entries. @ point, have join these 2 rdds using common key.
val rdda = somedata.rdd.map { x => (x.key, x); } // 10-million val rddb = somedata.rdd.map { y => (y.key, y); } // 600-million var joinrdd = rdda.join(rddb);
when spark decides join, decides shuffledhashjoin. causes many of items in rddb shuffled on network. likewise, of rdda shuffled on network. in case, rdda "big" use broadcast variable, seems broadcasthashjoin more efficient. there hint spark use broadcasthashjoin? (apache flink supports through join hints).
if not, option increase autobroadcastjointhreshold?
update 7/14
my performance issue appears squarely rooted in repartitioning. normally, rdd read hdfs partitioned block, in case, source parquet datasource [that made]. when spark (databricks) writes parquet file, writes 1 file per partition, , identically, reads 1 partition per file. so, best answer i've found during production of datasource, partition key then, write out parquet sink (which naturally co-partitioned) , use rddb.
the answer given correct, think details parquet datasource may useful else.
you can partition rdd's same partitioner, in case partitions same key collocated on same executor.
in case avoid shuffle join operations.
shuffle happen once, when you'll update parititoner, , if you'll cache rdd's joins after should local executors
import org.apache.spark.sparkcontext._ class class b val rdda: rdd[(string, a)] = ??? val rddb: rdd[(string, b)] = ??? val partitioner = new hashpartitioner(1000) rdda.partitionby(partitioner).cache() rddb.partitionby(partitioner).cache()
also can try update broadcast threshold size, maybe rdda can broadcasted:
--conf spark.sql.autobroadcastjointhreshold=300000000 # ~300 mb
we use 400mb broadcast joins, , works well.
Comments
Post a Comment