3

我有 Spring Cloud Task,它将数据从 SQL Server 加载到 Cassandra DB,它将在 Spring Cloud Data Flow 上运行。

Spring Task 的要求之一是提供关系数据库来持久化元数据,例如任务执行状态。但我不想为此使用上述任何一个数据库。相反,我必须为持久性指定第三个数据库。但似乎 Spring Cloud 任务流会自动从 application.properties 中获取 SQL Server 的数据源属性。如何为任务状态持久性指定另一个数据库?

我的当前属性:

spring.datasource.url=jdbc:sqlserver://iphost;databaseName=dbname
spring.datasource.username=user
spring.datasource.password=password
spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver
spring.jpa.show-sql=false
#spring.jpa.hibernate.dialect=org.hibernate.dialect.SQLServer2012Dialect
spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.hibernate.ddl-auto=none

spring.data.cassandra.contact-points=ip
spring.data.cassandra.port=9042
spring.data.cassandra.username=username
spring.data.cassandra.password=password
spring.data.cassandra.keyspace-name=mykeyspace
spring.data.cassandra.schema-action=CREATE_IF_NOT_EXISTS

更新:1 我添加了下面的代码以指向 Michael Minella 建议的第三个数据库。现在 Spring Task 能够连接到这个数据库并保持状态。但是现在我的批处理作业源查询也连接到了这个数据库。我唯一改变的是为任务添加数据源。

spring.task.datasource.url=jdbc:postgresql://host:5432/testdb?stringtype=unspecified
spring.task.datasource.username=user
spring.task.datasource.password=passwrod
spring.task.datasource.driverClassName=org.postgresql.Driver

@Configuration
public class DataSourceConfigs {

    @Bean(name = "taskDataSource")
    @ConfigurationProperties(prefix="spring.task.datasource")
    public DataSource getDataSource() {
        return DataSourceBuilder.create().build();
    }   
}


@Configuration
public class DDTaskConfigurer extends DefaultTaskConfigurer{


    @Autowired
    public DDTaskConfigurer(@Qualifier("taskDataSource") DataSource dataSource) {
        super(dataSource);

    }

}

更新#2:

@Component
@StepScope
public class MyItemReader extends RepositoryItemReader<Scan> implements InitializingBean{

    @Autowired
    private ScanRepository repository;
    private Integer lastScanIdPulled = null;

    public MyItemReader(Integer _lastIdPulled) {
        super();        
        if(_lastIdPulled == null || _lastIdPulled <=0 ){
            lastScanIdPulled = 0;
        } else {
            lastScanIdPulled = _lastIdPulled;
        }
    }



    @PostConstruct
    protected void setUpRepo() {
        final Map<String, Sort.Direction> sorts = new HashMap<>();
        sorts.put("id", Direction.ASC);
        this.setRepository(this.repository);
        this.setSort(sorts);
        this.setMethodName("findByScanGreaterThanId"); 
        List<Object> methodArgs = new ArrayList<Object>();
        System.out.println("lastScanIdpulled >>> " + lastScanIdPulled);
        if(lastScanIdPulled == null || lastScanIdPulled <=0 ){
            lastScanIdPulled = 0;
        }
        methodArgs.add(lastScanIdPulled);
        this.setArguments(methodArgs);
    }


}



@Repository
public interface ScanRepository extends JpaRepository<Scan, Integer> {


    @Query("...")
    Page<Scan> findAllScan(final Pageable pageable);

    @Query("...")
    Page<Scan> findByScanGreaterThanId(int id, final Pageable pageable);

}

更新#3: 如果我为存储库添加配置数据源,我现在得到以下异常。在您提到需要将其中一个数据源声明为 Primary 之前。我已经试过了。

Caused by: java.lang.IllegalStateException: Expected one datasource and found 2
at org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$TaskBatchExecutionListenerAutoconfiguration.taskBatchExecutionListener(TaskBatchAutoConfiguration.java:65) ~[spring-cloud-task-batch-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$TaskBatchExecutionListenerAutoconfiguration$$EnhancerBySpringCGLIB$$baeae6b9.CGLIB$taskBatchExecutionListener$0(<generated>) ~[spring-cloud-task-batch-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$TaskBatchExecutionListenerAutoconfiguration$$EnhancerBySpringCGLIB$$baeae6b9$$FastClassBySpringCGLIB$$5a898c9.invoke(<generated>) ~[spring-cloud-task-batch-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228) ~[spring-core-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:358) ~[spring-context-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfigu


@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
  entityManagerFactoryRef = "myEntityManagerFactory",
  basePackages = { "com.company.dd.collector.tool" },
  transactionManagerRef = "TransactionManager"

)
public class ToolDbConfig {

      @Bean(name = "myEntityManagerFactory")
      public LocalContainerEntityManagerFactoryBean 
      myEntityManagerFactory(
        EntityManagerFactoryBuilder builder,
        @Qualifier("ToolDataSource") DataSource dataSource
      ) {
        return builder
          .dataSource(dataSource)
          .packages("com.company.dd.collector.tool")
          .persistenceUnit("tooldatasource")
          .build();
      }


      @Bean(name = "myTransactionManager")
      public PlatformTransactionManager transactionManager(
        @Qualifier("myEntityManagerFactory") EntityManagerFactory 
        entityManagerFactory
      ) {
        return new JpaTransactionManager(entityManagerFactory);
      }
}

@配置

public class DataSourceConfigs {


    @Bean(name = "taskDataSource")
    @ConfigurationProperties(prefix="spring.task.datasource")
    public DataSource getDataSource() {
        return DataSourceBuilder.create().build();
    }   

    @Primary
    @Bean(name = "ToolDataSource")
    @ConfigurationProperties(prefix = "tool.datasource")
    public DataSource dataSource() {
      return DataSourceBuilder.create().build();
   }

}
4

1 回答 1

3

您需要创建一个TaskConfigurer来指定DataSource要使用的。您可以在此处的文档中阅读有关此接口的信息:https ://docs.spring.io/spring-cloud-task/1.1.1.RELEASE/reference/htmlsingle/#features-task-configurer

javadoc可以在这里找到:https ://docs.spring.io/spring-cloud-task/docs/current/apidocs/org/springframework/cloud/task/configuration/TaskConfigurer.html

更新 1:
当使用多个时DataSource,Spring Batch 和 Spring Cloud Task 都遵循相同的范例,因为它们都有*Configurer需要用于指定使用什么DataSource的接口。对于 Spring Batch,您使用BatchConfigurer(通常只是扩展DefaultBatchConfigurer),如上所述,TaskConfigurer在 Spring Cloud Task 中使用。这是因为当有多个 时DataSource,框架无法知道使用哪一个。

于 2018-02-08T16:30:49.580 回答