Quartz integration in JEE

Why Quartz? motivation

Nowadays everybody talks about cloud and Microservices. The JEE container isn’t that sexy anymore. With microprofile.io the JEE world started to move into the direction of a more lightweight structure. We should also mention docker and friends which changed the way we deploy and run applications now.

Long story short we have to keep in mind that the JEE timer depends on the admin server. Clustered timer just won’t really work anymore. Not to mention that the normal timers in JEE are pretty limited in their feature set. Even simple job triggers aren’t possible out of the box.

Bring me to the code.

Steps for the integration

  1. Configure Quartz Scheduler
  2. Configure CDI Job Factory
  3. Register our timers
Note: If you are already using Deltaspike you might want to use directly the scheduler integration provided by it.

Configure Quartz Scheduler

Select the right job store

In general, we have several ways of integration, going through the documentation we will see, that we have to choose between:

We will focus for now on a job store using a normal database. We cannot really choose the JobStoreTX as it wouldn’t join our transaction creating new triggers etc. Means if we accept during a REST call a command on our server, for which we also want to create a quartz trigger which does later on the heavy lifting, we would lose the ability to rollback the trigger and the modification of our data in one logical step. We might end up reverting all changes on the data, but still creating the trigger which e.g. won’t find the entities we created or modified for it.

For unit testing the RamJobStore is sufficient. Luckily the default, if we don’t specify anything we will get it anyway.

Configure JobStoreCMT

The CMT job store requires two DB connection pools. One will join our application transaction, means we can just simply provide the application data source.

The second connection pool has to be standalone, which means it shouldn’t join the transaction of the first pool. We can now either create a pool and disable the connection:

JDBC Connection Pool non-transactional

or create a non-container managed pool. Which would be that nice but possible.

ConnectionProvider

Nevertheless, we have to bridge the JEE world to the Quartz world using a small class implementing the ConnectionProvider:

@AllArgsConstructor
public class SimpleConnectionProvider implements ConnectionProvider {
    @NonNull
    private final DataSource dataSource;
    @Override
    public Connection getConnection() throws SQLException {
        return dataSource.getConnection();
    }
    @Override
    public void shutdown() {
    }
    @Override
    public void initialize() {
    }
}

ThreadPool

Quartz comes with its own code to manage threads. Like with the database connection we have also to tell quartz to use the managed executor service here. Furthermore, quartz allows to limit the threads.

public class SimpleQuartzJeeThreadPool implements ThreadPool {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleQuartzJeeThreadPool.class);

    private final ManagedExecutorService executorService;
    private final List<WeakReference<Future<?>>> tasks = Collections.synchronizedList(new ArrayList<>());
    /** the max pool size, could also be the max size of the custom executor services created */
    private final int poolSize;
    private boolean shutdown = false;

    public SimpleQuartzJeeThreadPool(ManagedExecutorService executorService, int poolSize) {
        this.executorService = executorService;
        this.poolSize = poolSize;
        Objects.requireNonNull(this.executorService, "ManagedExecutorService cannot be null.");
    }
 
    @Override
    public boolean runInThread(Runnable runnable) {
        if (shutdown) return false;
        try {
            tasks.add(new WeakReference<>(executorService.submit(runnable)));
            return true;
        } catch (RejectedExecutionException e) {
            LOG.info("runInThread was rejected, the pool capacitiy is exceeded! {}", e.getMessage());
            return false;
        }
    }

    @Override
    public int blockForAvailableThreads() {
        if (shutdown) return -1;
        try {
            // check if we have a thread available again and block for quartz // and purge
            this.executorService.submit(this::purge).get();

            return poolSize - tasks.size();
        } catch (Exception e) {
            LOG.warn("blockForAvailableThreads failed to pruge queue!", e);
            return poolSize - tasks.size();
        }
    }
  
    /**
     * Removes all monitored jobs and returns the removed count.
     * @return the removed jobn count
     */
    public int purge() {
        int removed = 0;
        if (!tasks.isEmpty()) {   
            List<WeakReference<Future<?>>> doneJobs = tasks.stream().filter(e -> e.isEnqueued() || e.get() == null || e.get().isDone())
                    .collect(Collectors.toList());

            removed = doneJobs.size();
            this.tasks.removeAll(doneJobs);
            doneJobs.clear();
        }
        return removed;
    }

    @Override
    public void initialize() {
    }
    @Override
    public void shutdown(boolean waitForJobsToComplete) {
        this.shutdown = true;
    }
    @Override
    public int getPoolSize() {
        return poolSize;
    }
    @Override
    public void setInstanceId(String schedInstId) {
    }
    @Override
    public void setInstanceName(String schedName) {
    }
}
Note: The executor created for quartz shouldn’t have any queue capacity, as quartz manages the queue.

Put it together

Which leads us to the following configuration

@Startup
@Singleton
public class TimerConfiguration {
    // this data source should be non transactional in the container
    private static final String QUARTZ_DS = "jdbc/quartz-datasource";
    @Resource(lookup = QUARTZ_DS)
    private DataSource quartzDataSource;
    // the default application datasource
    private static final String APP_DS = "jdbc/app-datasource";
    @Resource(lookup = APP_DS)
    private DataSource appDataSource;
    
    @Inject InjectionJobFactory jobFactory;
	// executor service for our jobs to quartz
    @Resource(name = "concurrent/quartz-executor")
    private ManagedExecutorService executorService;

    private Scheduler scheduler;
    
    @PostConstruct
    public void start() {
        try {
            // Get scheduler and start it
            final DirectSchedulerFactory schedulerFactory = DirectSchedulerFactory.getInstance();
			// provide the datasources            
            DBConnectionManager.getInstance()
                    .addConnectionProvider(APP_DS, new SimpleConnectionProvider(appDataSource));
            DBConnectionManager.getInstance()
                    .addConnectionProvider(QUARTZ_DS, new SimpleConnectionProvider(quartzDataSource));
            
            JobStoreCMT jobStore = new JobStoreCMT();
            jobStore.setInstanceName("quartz-job-example-store");
            jobStore.setDataSource(APP_DS);
            jobStore.setNonManagedTXDataSource(QUARTZ_DS);
            // use the JEE executor service
            jobStore.setThreadExecutor(new ThreadExecutor() {
                @Override
                public void execute(Thread thread) {
                    executorService.execute(thread);
                }
                @Override
                public void initialize() {
                }
            });
            jobStore.setIsClustered(true);
            jobStore.setDriverDelegateClass("org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
            // note: the id matters as it defines if the times is seens as an own node ...
            schedulerFactory.createScheduler("JEE-QUARTZ", "JEE-QUARTZ-ID", new SimpleQuartzJeeThreadPool(executorService, 20), jobStore);
            
            scheduler = schedulerFactory.getScheduler("JEE-QUARTZ");

            // Use the CDI managed job factory
            scheduler.setJobFactory(jobFactory);

            // Start scheduler
            scheduler.start();

        } catch (Exception e) {
            throw new RuntimeException("Failed to configure and start quartz.", e);
        }
    }
Note: The scheduler provides a way to set the ID of a quartz timer. If the ID is the same between two nodes they will not balance the load between each other. Make sure each node has an own ID.

Configure CDI Job Factory

The default behavior of Quartz would be to create a new instance of the class we register as a job. In most cases, this is not what we want. Usually, we want to use our CDI beans or JEE EJBs, for the spring guys the spring beans. As so we have to tell Quartz somehow how to find our beans.

Injection JobFactory

The easiest and most clean way is to provide an adapter to our CDI world. We just inject all classes which implement the Job interface, using it to find the right one based on the job detail we define during the job registration.

@ApplicationScoped
public class InjectionJobFactory implements JobFactory {    
    @Inject @Any
    private Instance<Job> jobs;
    
    @Override
    public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
        final Class<? extends Job> clazz = bundle.getJobDetail().getJobClass();
        final Instance<? extends Job> bean = jobs.select(clazz);
        if (bean.isUnsatisfied()) throw new SchedulerException("No job bean of type " + clazz + " found.");
        return bean.get();
    }
}
Note: Of course we could use additional vaules to do fruther filtering e.g. based on annotations.

Register our timers

Quartz provides us the Job interface our timer have to implement. As soon this is done we can register a timer:

// Create a QuarzJob to run
JobDetail jeeJob = JobBuilder.newJob(CdiTimer.class)
        .withIdentity(CdiTimer.class.getSimpleName())
        .build();

// Create a Trigger to trigger the job every five minutes 
Trigger trigger = TriggerBuilder.newTrigger()
                 // run every 10 seconds
                 .withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
                 .forJob(jeeJob)			
                 .build();

// Register Job and Trigger with the scheduler
scheduler.scheduleJob(jeeJob, trigger);

CDI Timer

Using CDI beans as timers is just straight forward, add the interface, ensure they are annotated as application scoped, done.

import org.quartz.Job;
import javax.enterprise.context.ApplicationScoped;
// our simple CDI timer bean
@ApplicationScoped // timer don't have a scope or web request ...
public class CdiTimer implements Job {

    @Override
    public void execute(JobExecutionContext context) {
        // the CDI timer code
    }   
}

EJBs as timer beans

EJBs are a bit different. As soon as we add the interface you might encounter the problem that @Inject doesn’t work. Well long story short just tell JEE that it just a local bean with @LocalBean and all will be good:

import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import org.quartz.Job;
// tells JEE to not mess around because we use an interface, this is not a remote EJB!
@LocalBean 
@Stateless
public class EjbTimer implements Job {

    @Override
    public void execute(JobExecutionContext context) {
        // the EJB timer code
    }
}

Links

Sidenotes

Not enough? Well here some other side notes.

Manage dependant Scope Beans

Just for completeness but not as a recommendation, it is possible to use also not annotated CDI beans as Timer beans. Of course, we have to handle than its lifecycle and destroy them after they have beans used. In the JBoss Documentation, we are pointed to a WeldInstance class, which allows us to check the scope and later also to destroy the bean.

// important is the usage of the WeldInstance instead of Instance
@Inject @Any
WeldInstance Job jobs;

// which would allow us to add the destroy code after the timer was used
// problem of course is to find the correct point in Quartz to postion this part
if (Dependent.class.equals(handler.getBean().getScope()) {
    // Destroy only dependent CDI beans
    handler.destroy();
}

Creating the Quartz tables

Quartz provides two ways to set up the tables out of the box:

  1. SQL to be used e.g. with flyway
  2. XML to be used with liquibase

The second one has the charm that it works also in tests out of the box, as liquibase supports multiple DBs.

As so we can copy the liquibase.quartz.init.xml into our src/main/resources and execute the following code before the initialization of quartz.

try (Connection c = dataSource.getConnection()) {
    Liquibase l = new Liquibase(
            "/liquibase.quartz.init.xml", 
            new ClassLoaderResourceAccessor(getClass().getClassLoader()),
            DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnection(c)));
    l.update("QUARTZ-SCHEMA");
} catch (Exception e) {
    throw new RuntimeException("Failed to init quartz schema.", e);
}

Paul Sterl has written 21 articles

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>