Spring Batch annotation partitioner issue -
i trying create spring batch application using annotation based approach partitioner, triggered quartz scheduler, getting following issues.
when job triggered each partition executed sequentially instead of parallelly i.e if have 10 partitions instead of 10 getting triggered/processed process 1 one.
when more 1 instance of job(this needed per requirement) gets triggered it's not getting synchronized i.e when 2nd instace started uses 1st instance data , 1st instance stop processing active.
following configuration/class files.
batchconfiguration.java -
@configuration @enablebatchprocessing public class batchconfiguration { @autowired private jobbuilderfactory jobbuilders; @autowired private stepbuilderfactory stepbuilders; @bean @stepscope public jdbccursoritemreader reader(@value("#{stepexecutioncontext[someparam]}") string someparam) { jdbccursoritemreader jdbccursoritemreader = new jdbccursoritemreader(); jdbccursoritemreader.setdatasource(getdatasource()); jdbccursoritemreader.setsql("myquery"); jdbccursoritemreader.setrowmapper(new notifrowmapper()); return jdbccursoritemreader;} @bean @stepscope public myprocessor processor() { return new myprocessor();} @bean public mypartitioner partitioner() { mypartitioner partitioner = new mypartitioner(); partitioner.setdatasource(getdatasource()); partitioner.setsql("mypartitionerquery"); return partitioner;} @bean @stepscope public jdbcbatchitemwriter writer(datasource datasource) { jdbcbatchitemwriter writer = new jdbcbatchitemwriter(); writer.setitemsqlparametersourceprovider(new beanpropertyitemsqlparametersourceprovider()); writer.setsql("mywriterquery"); writer.setdatasource(datasource); return writer;} @bean public job springbatch() { return jobbuilders.get("springbatch").start(masterstep()).build();} @bean public step masterstep() { return stepbuilders.get("masterstep") .partitioner(slave(reader(null), writer(getdatasource()),processor())) .partitioner("slave", partitioner()) .taskexecutor(taskexecutor()).build();} @bean public step slave(jdbccursoritemreader reader,jdbcbatchitemwriter writer, myprocessor processor) { return stepbuilders.get("slave") .chunk(100).reader(reader).processor(processor).writer(writer).build();} @bean public taskexecutor taskexecutor() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); taskexecutor.setmaxpoolsize(20); taskexecutor.afterpropertiesset(); return taskexecutor;} @bean public jdbctemplate jdbctemplate(datasource datasource) { return new jdbctemplate(datasource);} @bean public datasource getdatasource() { return datasource; } @bean public jobrepository getjobrepository() throws exception { mapjobrepositoryfactorybean factory = new mapjobrepositoryfactorybean(); factory.settransactionmanager(new resourcelesstransactionmanager()); factory.afterpropertiesset(); factory.setisolationlevelforcreate("isolation_read_committed"); return (jobrepository) factory.getobject(); }}
quartzjob.java(triggers spring batch job) -
public class quartzjob implements org.quartz.job { @override public void execute(org.quartz.jobexecutioncontext jobexecutioncontext) throws org.quartz.jobexecutionexception { annotationconfigapplicationcontext context; try { context = new annotationconfigapplicationcontext(batchconfiguration.class); joblauncher joblauncher = context.getbean(joblauncher.class); org.springframework.batch.core.job newjob = context.getbean("springbatch", org.springframework.batch.core.job.class); jobparameters param = new jobparametersbuilder().addlong("time",system.currenttimemillis()).tojobparameters(); joblauncher.run(newjob, param); } catch (exception e){}}}
myquartzlistener.java(class triggers quartz job during server start up)-
public class myquartzlistener implements servletcontextlistener { private scheduler scheduler; @override public void contextdestroyed(servletcontextevent arg0){ } @override public void contextinitialized(servletcontextevent ctx) { jobdetail job = jobbuilder.newjob(quartzjob.class).withidentity("springbatch", "springbatch").build(); trigger trigger = triggerbuilder.newtrigger().withidentity("springbatch", "springbatch").startnow().withschedule(simpleschedulebuilder.simpleschedule().withintervalinseconds(60).repeatforever()).build(); try { scheduler = ((stdschedulerfactory) ctx.getservletcontext().getattribute(quartzinitializerlistener.quartz_factory_key)).getscheduler(); job.getjobdatamap().put("quartztime", system.currenttimemillis()); scheduler.schedulejob(job, trigger); } catch (schedulerexception e) {} }
}
mypartitioner .java
public class mypartitioner implements partitioner { @override public map<string, executioncontext> partition(int gridsize) { map<string, executioncontext> partitionmap = new hashmap<string, executioncontext>(); list<string> partitioncodes = getpartitioncodes(sql); int count = 1; (string partitioncode : partitioncodes) { executioncontext context = new executioncontext(); context.put("partitioncode", partitioncode); context.put("name", "thread" + count); partitionmap.put(partitioncode, context); count++; } return partitionmap;}}
is there wrong configuration? passing current time each instance of job identify each instance seperately, still it's not working.
Comments
Post a Comment