problem description
because the temporary table created by DF needs to be queried many times, so the temporary table is cached, but it doesn"t work?
the amount of data is relatively large, about 10 billion, so it is necessary to optimize the efficiency.
the environmental background of the problems and what methods you have tried
before that, I also tried to repeatedly submit spark tasks in a multi-threaded way, and also cached temporary tables, but it didn"t seem to work either.
related codes
/ / Please paste the code text below (do not replace the code with pictures)
this is the normal way to submit the spark task, the code is as follows:
val data: DataFrame = sparkSession.read.parquet("XXX")
data.createOrReplaceTempView("table_info")
sparkSession.catalog.cacheTable("table_info")
// sqlsql
val featureArray: ArrayBuffer[String] = StrsUtils.generateFeatures("type,`from`,page,value,source")
for(i <- 0 to 3) {
val all_type = featureArray(i)
val sql_merge =
s"""
| SELECT
| appid, soft_version, id, event_type, type, `from`, page, value, source,
| count(distinct cuid) as uv,
| sum(pv) AS pv,
| sum(duration) AS duration
| FROM(
| SELECT cuid, appid, id, event_type, soft_version, duration, pv, event_day, $all_type
| FROM table_info
| )tmp
| GROUP BY appid, soft_version, id, event_type, type, `from`, page, value, source
|
""".stripMargin
logger.info("merge_sql: " + sql_merge)
sparkSession.sql(sql_merge).repartition(1).write.mode("overwrite").parquet(s"XXX/result/$event_day/" + i )
}
//cache
sparkSession.catalog.clearCache()
sparkSession.stop()
what result do you expect? What is the error message actually seen?
Stage;
StageDAG:
the time of this Stage is about 3-4 minutes. Normally, the first cycle will be cached, and the second and third cycles will directly query from the cache, but the later DAG diagram is the same as the first one, and the time is about the same, so I think it may be that cache to the temporary table does not work.