apache spark - Why is my executor memory usage stuck at 0? -
i have pretty simple spark job looks this:
javapairrdd<key,value> rawdata = newaccumulordd(...); javapairrdd<key,value> indexsrc = rawdata.filter(new indexfilter()).cache(); javapairrdd<key,value> indexentries = indexsrc.mappartitionstopair(new indexbuilder(numpartitions)); javapairrdd<key,value> reverseindexentries = indexsrc.mappartitionstopair(new reverseindexbuilder(numpartitions)); javapairrdd<key,value> dataentries = rawdata.mappartitionstopair(new databuilder(numpartitions)).cache(); dataentries.union(indexentries) .union(reverseindexentries) .repartitionandsortwithinpartitions(new partitionedindexrddpartitioner(num_bins)) .saveasnewapihadoopfile(pidxbulk.tostring(), key.class, value.class, accumulofileoutputformat.class, conf);
where key , value apache accumulo's key , value classes (using kryoserializer).
i'm not sure put calls cache(), or if they're needed @ all. i'm concerned executors don't seem using of memory i've allocated them:
and "storage" page in application ui empty.
am doing wrong, or has spark decided can't make job go faster storing rdds?
memory used means memory used caching.
in code performing 1 action , indexsrc or dataentries not used again, there not point of caching it.
to prove it, can add
indexsrc.count();
, dataentries.count();
after declaring them , check executor/storage page.
javapairrdd<key,value> rawdata = newaccumulordd(...); javapairrdd<key,value> indexsrc = rawdata.filter(new indexfilter()).cache(); indexsrc.count(); javapairrdd<key,value> indexentries = indexsrc.mappartitionstopair(new indexbuilder(numpartitions)); javapairrdd<key,value> reverseindexentries = indexsrc.mappartitionstopair(new reverseindexbuilder(numpartitions)); javapairrdd<key,value> dataentries = rawdata.mappartitionstopair(new databuilder(numpartitions)).cache(); dataentries.count(); dataentries.union(indexentries) .union(reverseindexentries) .repartitionandsortwithinpartitions(new partitionedindexrddpartitioner(num_bins)) .saveasnewapihadoopfile(pidxbulk.tostring(), key.class, value.class, accumulofileoutputformat.class, conf);
Comments
Post a Comment