您现在的位置是:首页 >学无止境 >SpringBatch的两种实现方式: Tasklet 和 Chunk网站首页学无止境
SpringBatch的两种实现方式: Tasklet 和 Chunk
直接上代码
■ 共通部分:
1. 代码结构
2. pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Edgware.SR2</spring-cloud.version>
<start-class>roy.springbatch.batTasklet.BatTasklet</start-class>
<start-class>roy.springbatch.batChunk.BatChunk</start-class>
</properties>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
3. framework/BatchAnnotation.java
package roy.springbatch.framework;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
@Import({SimpleBatchConfiguration.class})
@EnableBatchProcessing
@ComponentScan
@ComponentScans({@ComponentScan("roy.springbatch.framework")})
@PropertySource(value = "classpath:config/jdbc-dev.properties")
public @interface BatchAnnotation {
}
4. framework/BaseModule.java
package roy.springbatch.framework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import java.util.HashMap;
import java.util.Map;
public abstract class BaseModule implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(BaseModule.class);
public static void run(Class<? extends BaseModule> module, String batchName, String[] args)
throws Exception {
SpringApplication app = new SpringApplication(module);
Map<String, Object> param = retriveArgs(batchName, args);
app.setDefaultProperties(param);
app.run(args);
}
private static Map<String, Object> retriveArgs(String batchName, String[] args){
Map<String, Object> param = new HashMap<>();
param.put("argsLength", args.length);
if (args.length>0){
param.put("targetDate", args[0]);
}
return param;
}
@Override
public void run(String... args) throws Exception{
if (null != args){
for(String arg : args){
log.info("execute module with argument : " + arg);
}
}
}
}
5. framework/BaseWriter.java
package roy.springbatch.framework;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
public abstract class BaseWriter<T> implements ItemWriter<T> {
protected StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution){
this.stepExecution = stepExecution;
}
@Override
public void write(List<? extends T> items) throws Exception {
JobParameters params = stepExecution.getJobParameters();
ExecutionContext stepContext = stepExecution.getExecutionContext();
for(T item : items){
doWrite(item, params, stepContext);
}
}
public abstract void doWrite(T item, JobParameters params, ExecutionContext stepContext) throws Exception;
}
一. Tasklet
1. batTasklet/BatTasklet.java
package roy.springbatch.batTasklet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import roy.springbatch.framework.BaseModule;
import roy.springbatch.framework.BatchAnnotation;
@BatchAnnotation
public class BatTasklet extends BaseModule {
private static final Logger log = LoggerFactory.getLogger(BatTasklet.class);
private static final String MODULE_NAME = "BATCHTASKLET";
public static void main(String[] args) {
try {
run(BatTasklet.class, MODULE_NAME, args);
} catch (Exception e) {
log.error(MODULE_NAME + " failed.");
System.exit(1);
}
}
}
2. batTasklet/BatTaskletConfiguration.java
package roy.springbatch.batTasklet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import roy.springbatch.framework.BaseModule;
@Configuration
@EnableBatchProcessing
public class BatTaskletConfiguration extends BaseModule {
private static final Logger log = LoggerFactory.getLogger(BatTaskletConfiguration.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Tasklet tasklet1;
@Autowired
private Tasklet tasklet2;
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.allowStartIfComplete(true)
.tasklet(tasklet1)
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.allowStartIfComplete(true)
.tasklet(tasklet2)
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("step-tasklet-job")
.incrementer(new RunIdIncrementer())
.start(step1())
.next(step2())
.build();
}
}
3. batTasklet/Task1.java (具体想要做的事情写在Task里面)
package roy.springbatch.batTasklet;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Task1 {
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("--->Tasklet 1 Execute:" + System.currentTimeMillis());
return RepeatStatus.FINISHED;
}
};
}
}
4. batTasklet/Task2.java
package roy.springbatch.batTasklet;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Task2 {
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("--->Tasklet 2 Execute:" + System.currentTimeMillis());
return RepeatStatus.FINISHED;
}
};
}
}
5. 调用方法:
在pom.xml中定义了2个start-class,用下面的方法指定启动类:
>java -cp batch-0.0.1.jar -Dloader.main=roy.springbatch.batTasklet.BatTasklet org.springframework.boot.loader.PropertiesLauncher
6. 测试:
二. Chunk
1. batChunk/listener/BatChunkListener.java
package roy.springbatch.batChunk.listener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.stereotype.Component;
@Component
public class BatChunkListener extends JobExecutionListenerSupport {
@Override
public void beforeJob(JobExecution jobExecution){
System.out.println("--->BeforeJob Execute");
}
@Override
public void afterJob(JobExecution jobExecution){
System.out.println("--->AfterJob Execute");
}
}
2. batChunk/reader/BatChunkReader.java
package roy.springbatch.batChunk.reader;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.util.HashMap;
import java.util.Map;
public class BatChunkReader implements ItemReader<Map<String, String>> {
private int stepCount = 0;
private Map<String, String> listMap = new HashMap<>();
@Override
public Map<String, String> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(stepCount<3){
stepCount++;
System.out.println("--->Reader Execute: read from DB");
listMap.put("A1", "1");
listMap.put("A2", "2");
listMap.put("A3", "3");
return listMap;
}else {
return null;
}
}
}
3. batChunk/writer/BatChunkWriter.java
package roy.springbatch.batChunk.writer;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import roy.springbatch.framework.BaseWriter;
import java.util.Map;
@Component
@Scope("prototype")
public class BatChunkWriter extends BaseWriter<Map<String, String>> {
@Override
public void doWrite(Map<String, String> map, JobParameters params,
ExecutionContext stepContext) throws Exception {
System.out.println("--->Writer Execute: write to DB:" + map.toString());
}
}
4. batChunk/BatChunk.java
package roy.springbatch.batChunk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import roy.springbatch.framework.BaseModule;
import roy.springbatch.framework.BatchAnnotation;
@BatchAnnotation
public class BatChunk extends BaseModule {
private static final Logger log = LoggerFactory.getLogger(BatChunk.class);
private static final String MODULE_NAME = "BATCHCHUNK";
public static void main(String[] args) {
try {
run(BatChunk.class, MODULE_NAME, args);
} catch (Exception e) {
log.error(MODULE_NAME + " failed.");
System.exit(1);
}
}
}
5. batChunk/BatChunkConfiguration.java
package roy.springbatch.batChunk;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.transaction.PlatformTransactionManager;
import roy.springbatch.batChunk.listener.BatChunkListener;
import roy.springbatch.batChunk.reader.BatChunkReader;
import roy.springbatch.batChunk.writer.BatChunkWriter;
import java.util.Map;
@Configuration
@EnableBatchProcessing
public class BatChunkConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private BatChunkWriter batChunkWriter;
@Autowired
private PlatformTransactionManager dbWriteManager;
@Bean
@Scope("prototype")
public ItemReader<Map<String, String>> readerByMyBatis() {
return new BatChunkReader();
}
@Bean
public Job insertJob(BatChunkListener listener){
String jobName = "InsertJob:" + System.currentTimeMillis();
return jobBuilderFactory.get(jobName).start(stepInsert()).listener(listener).build();
}
@Bean
public Step stepInsert(){
return stepBuilderFactory.get("stepInsert")
.<Map<String, String>, Map<String, String>>chunk(1)
.reader(readerByMyBatis())
.writer(batChunkWriter)
.transactionManager(dbWriteManager)
.allowStartIfComplete(true)
.build();
}
}
6. 调用方法:
在pom.xml中定义了2个start-class,用下面的方法指定启动类:
>java -cp batch-0.0.1.jar -Dloader.main=roy.springbatch.batChunk.BatChunk
org.springframework.boot.loader.PropertiesLauncher
6. 测试:
三. 代码下载:SpringBatch Sample