您现在的位置是:首页 >学无止境 >SpringBatch的两种实现方式: Tasklet 和 Chunk网站首页学无止境

SpringBatch的两种实现方式: Tasklet 和 Chunk

Royi666 2024-07-11 08:06:33
简介SpringBatch的两种实现方式: Tasklet 和 Chunk

直接上代码

■ 共通部分:

1. 代码结构

 

 2. pom.xml

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Edgware.SR2</spring-cloud.version>
        <start-class>roy.springbatch.batTasklet.BatTasklet</start-class>
        <start-class>roy.springbatch.batChunk.BatChunk</start-class>
    </properties>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

3. framework/BatchAnnotation.java

package roy.springbatch.framework;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;

import java.lang.annotation.*;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
@Import({SimpleBatchConfiguration.class})
@EnableBatchProcessing
@ComponentScan
@ComponentScans({@ComponentScan("roy.springbatch.framework")})
@PropertySource(value = "classpath:config/jdbc-dev.properties")
public @interface BatchAnnotation {
}

4. framework/BaseModule.java

package roy.springbatch.framework;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;

import java.util.HashMap;
import java.util.Map;

public abstract class BaseModule implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(BaseModule.class);

    public static void run(Class<? extends BaseModule> module, String batchName, String[] args)
        throws Exception {
        SpringApplication app = new SpringApplication(module);
        Map<String, Object> param = retriveArgs(batchName, args);
        app.setDefaultProperties(param);
        app.run(args);
    }

    private static Map<String, Object> retriveArgs(String batchName, String[] args){
        Map<String, Object> param = new HashMap<>();
        param.put("argsLength", args.length);
        if (args.length>0){
            param.put("targetDate", args[0]);
        }
        return param;
    }

    @Override
    public void run(String... args) throws Exception{
        if (null != args){
            for(String arg : args){
                log.info("execute module with argument : " + arg);
            }
        }
    }
}

5. framework/BaseWriter.java

package roy.springbatch.framework;

import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

public abstract class BaseWriter<T> implements ItemWriter<T> {

    protected StepExecution stepExecution;

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution){
        this.stepExecution = stepExecution;
    }

    @Override
    public void write(List<? extends T> items) throws Exception {
        JobParameters params = stepExecution.getJobParameters();
        ExecutionContext stepContext = stepExecution.getExecutionContext();

        for(T item : items){
            doWrite(item, params, stepContext);
        }
    }

    public abstract void doWrite(T item, JobParameters params, ExecutionContext stepContext) throws Exception;
}

一. Tasklet

1. batTasklet/BatTasklet.java

package roy.springbatch.batTasklet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import roy.springbatch.framework.BaseModule;
import roy.springbatch.framework.BatchAnnotation;

@BatchAnnotation
public class BatTasklet extends BaseModule {

    private static final Logger log = LoggerFactory.getLogger(BatTasklet.class);

    private static final String MODULE_NAME = "BATCHTASKLET";

    public static void main(String[] args) {
        try {
            run(BatTasklet.class, MODULE_NAME, args);
        } catch (Exception e) {
            log.error(MODULE_NAME + " failed.");
            System.exit(1);
        }
    }
}

2. batTasklet/BatTaskletConfiguration.java

package roy.springbatch.batTasklet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import roy.springbatch.framework.BaseModule;

@Configuration
@EnableBatchProcessing
public class BatTaskletConfiguration extends BaseModule {

    private static final Logger log = LoggerFactory.getLogger(BatTaskletConfiguration.class);

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private Tasklet tasklet1;

    @Autowired
    private Tasklet tasklet2;

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .allowStartIfComplete(true)
                .tasklet(tasklet1)
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .allowStartIfComplete(true)
                .tasklet(tasklet2)
                .build();
    }

    @Bean
    public Job job(){
        return jobBuilderFactory.get("step-tasklet-job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .build();
    }

}

3. batTasklet/Task1.java (具体想要做的事情写在Task里面)

package roy.springbatch.batTasklet;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class Task1 {

    @Bean
    public Tasklet tasklet1(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("--->Tasklet 1 Execute:" + System.currentTimeMillis());
                return RepeatStatus.FINISHED;
            }
        };
    }
}

4. batTasklet/Task2.java

package roy.springbatch.batTasklet;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class Task2 {

    @Bean
    public Tasklet tasklet2(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("--->Tasklet 2 Execute:" + System.currentTimeMillis());
                return RepeatStatus.FINISHED;
            }
        };
    }
}

5. 调用方法:

在pom.xml中定义了2个start-class,用下面的方法指定启动类:

>java -cp batch-0.0.1.jar -Dloader.main=roy.springbatch.batTasklet.BatTasklet org.springframework.boot.loader.PropertiesLauncher

6. 测试:

二. Chunk

1. batChunk/listener/BatChunkListener.java

package roy.springbatch.batChunk.listener;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.stereotype.Component;

@Component
public class BatChunkListener extends JobExecutionListenerSupport {

    @Override
    public void beforeJob(JobExecution jobExecution){
        System.out.println("--->BeforeJob Execute");
    }

    @Override
    public void afterJob(JobExecution jobExecution){
        System.out.println("--->AfterJob Execute");
    }
}

2. batChunk/reader/BatChunkReader.java

package roy.springbatch.batChunk.reader;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.util.HashMap;
import java.util.Map;

public class BatChunkReader implements ItemReader<Map<String, String>> {

    private int stepCount = 0;

    private Map<String, String> listMap = new HashMap<>();

    @Override
    public Map<String, String> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        if(stepCount<3){
            stepCount++;
            System.out.println("--->Reader Execute: read from DB");
            listMap.put("A1", "1");
            listMap.put("A2", "2");
            listMap.put("A3", "3");
            return listMap;
        }else {
            return null;
        }
    }
}

3. batChunk/writer/BatChunkWriter.java

package roy.springbatch.batChunk.writer;

import org.springframework.batch.core.JobParameters;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import roy.springbatch.framework.BaseWriter;

import java.util.Map;

@Component
@Scope("prototype")
public class BatChunkWriter extends BaseWriter<Map<String, String>> {

    @Override
    public void doWrite(Map<String, String> map, JobParameters params,
                        ExecutionContext stepContext) throws Exception {

        System.out.println("--->Writer Execute: write to DB:" + map.toString());
    }
}

4. batChunk/BatChunk.java

package roy.springbatch.batChunk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import roy.springbatch.framework.BaseModule;
import roy.springbatch.framework.BatchAnnotation;

@BatchAnnotation
public class BatChunk extends BaseModule {

    private static final Logger log = LoggerFactory.getLogger(BatChunk.class);

    private static final String MODULE_NAME = "BATCHCHUNK";

    public static void main(String[] args) {
        try {
            run(BatChunk.class, MODULE_NAME, args);
        } catch (Exception e) {
            log.error(MODULE_NAME + " failed.");
            System.exit(1);
        }
    }
}

5. batChunk/BatChunkConfiguration.java

package roy.springbatch.batChunk;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.transaction.PlatformTransactionManager;
import roy.springbatch.batChunk.listener.BatChunkListener;
import roy.springbatch.batChunk.reader.BatChunkReader;
import roy.springbatch.batChunk.writer.BatChunkWriter;

import java.util.Map;

@Configuration
@EnableBatchProcessing
public class BatChunkConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private BatChunkWriter batChunkWriter;

    @Autowired
    private PlatformTransactionManager dbWriteManager;

    @Bean
    @Scope("prototype")
    public ItemReader<Map<String, String>> readerByMyBatis() {
        return new BatChunkReader();
    }

    @Bean
    public Job insertJob(BatChunkListener listener){
        String jobName = "InsertJob:" + System.currentTimeMillis();
        return jobBuilderFactory.get(jobName).start(stepInsert()).listener(listener).build();
    }

    @Bean
    public Step stepInsert(){
        return stepBuilderFactory.get("stepInsert")
                .<Map<String, String>, Map<String, String>>chunk(1)
                .reader(readerByMyBatis())
                .writer(batChunkWriter)
                .transactionManager(dbWriteManager)
                .allowStartIfComplete(true)
                .build();
    }
}

6. 调用方法:

在pom.xml中定义了2个start-class,用下面的方法指定启动类:

>java -cp batch-0.0.1.jar -Dloader.main=roy.springbatch.batChunk.BatChunk

org.springframework.boot.loader.PropertiesLauncher

6. 测试:

三. 代码下载:SpringBatch Sample

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。