task

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: MIT Imports: 11 Imported by: 0

README

task package tutorial

task package implements a job scheduler. Jobs in scheduler can be scheduled to run periodically or once at specific time.

1. basic usages

You can add periodic jobs or run once jobs to the default scheduler, and then start to schedule.

Examples as below:

package main

import (
    "time"
    "github.com/byte-power/gorich/task"
)

func main() {
    jobName1 := "once_job1_name"
    // Add jobName1 to the default scheduler, the job will run immediately after scheduler starts.
    task.Once(jobName1, sum, 10, 20)

    jobName2 := "once_job2_name"
    // Add jobName2 to the default scheduler, the job will run 5 seconds after scheduler starts.
    task.Once(jobName2, sum, 100, 200).Delay(5 * time.Second)

    jobName3 := "periodic_job3_name"
    // Add jobName3 to the default scheduler, the job will run every 2 days at 10:20:30 (in local timezone by default) after scheduler starts.
     _, err := task.Periodic(jobName3, sum, 20, 30).EveryDays(2).AtHourInDay(10, 20, 30)
     if err != nil {
        return
     }

    // Start the default scheduler.
    task.StartScheduler()
}

func sum(a, b int) int {
    return a + b
}

2. monitor job stats

You can also monitor the scheduled jobs via JobStats.

package main

import (
    "fmt"
    "time"
    "github.com/byte-power/gorich/task"
)

func main() {
    jobName1 := "once_job1_name"
    // Add jobName1 to the default scheduler, the job will run immediately after scheduler starts.
    task.Once(jobName1, sum, 10, 20)

    jobName2 := "once_job2_name"
    // Add jobName2 to the default scheduler, the job will run 5 seconds after scheduler starts.
    task.Once(jobName2, sum, 100, 200).Delay(5 * time.Second)
    go monitorScheduler()
    task.StartScheduler()
}

func monitorScheduler() {
    for {
        // handle all job stats
        allJobStats := task.JobStats()
        for jobName, jobStats := range allJobStats {
            fmt.Printf("job %s stat:\n", jobName)
            for _, stat := range jobStats {
                fmt.Println(stat.ToMap())
            }
        }
        time.Sleep(5 * time.Second)
    }
}

func sum(a, b int) int {
    return a + b
}

3. job coordination

When running periodic jobs in multiple servers, you can use Coordinate to coordinate running and avoid unnecessary running.

Notice that Coordinate use a lock that will unlock automatically 5 seconds later, so if the job running interval is less than 5 seconds, some runnings will not be allowed.

package main

import (
    "fmt"
    "time"
    "github.com/byte-power/gorich/task"
)

func main () {
    coordinator := task.NewCoordinatorFromRedis("coordinator1", "localhost:6379")
    // with redis cluster, use:
    // task.NewCoordinatorFromRedisCluster("coordinator2", []string{"localhost:30000", "localhost:30001"})

    // starts two schedulers
    scheduler1 := task.NewScheduler(10)
    scheduler2 := task.NewScheduler(10)

    name := "coordinate_job"
    job1 := scheduler1.AddPeriodicJob(name, sum, 1, 2).EverySeconds(10).SetCoordinate(coordinator)
    job2 := scheduler2.AddPeriodicJob(name, sum, 3, 4).EverySeconds(10).SetCoordinate(coordinator)

    // job1 and job2 will coordinate, only one of them will be scheduled once every 10 seconds
    go scheduler1.Start()
    go scheduler2.Start()

    go monitorJob(job1)
    go monitorJob(job2)


    // stop schedulers after 30 seconds
    time.Sleep(30 * time.Second)

    scheduler1.Stop(false)
    scheduler2.Stop(false)
}

func monitorJob(job task.Job) {
    for {
        jobStats := job.Stats()
        fmt.Printf("job  %s stats:\n", job.Name())
        for _, stat := range jobStats {
            fmt.Println(stat.ToMap())
        }
        time.Sleep(5 * time.Second)
    }
}

func sum(a, b int) int {
    return a + b
}

See more examples here.

Documentation

Overview

Package task implements a task scheduler. One task is mapped to a job and scheduled by a scheduler.

There are two scheduler strategies:

1) run only once at a specific time

2) run periodically

Example
package main

import (
	"fmt"
	"time"

	"github.com/byte-power/gorich/task"
)

func main() {
	// will run immediately after scheduler starts
	task.Once("once_job1_name", sum, 10, 20)

	// will run 5 seconds after scheduler starts
	task.Once("once_job2_name", sum, 100, 200).Delay(5 * time.Second)

	// will run every 2 days at 10:20:30 (in local timezone by default) after scheduler starts
	_, err := task.Periodic("periodic_job3_name", sum, 20, 30).EveryDays(2).AtHourInDay(10, 20, 30)
	if err != nil {
		return
	}

	// will run every Friday at 10:20:30 (in Asia/Shanghai timezone) after scheduler starts
	tz, _ := time.LoadLocation("Asia/Shanghai")
	_, err = task.Periodic("periodic_job4_name", sum, 20, 40).EveryFridays(1).SetTimeZone(tz).AtHourInDay(10, 20, 30)
	if err != nil {
		return
	}

	// will run every 2 hours at 20:30
	_, err = task.Periodic("periodic_job5_name", sum, 10, 20).EveryHours(2).AtMinuteInHour(20, 30)
	if err != nil {
		return
	}

	// will run every 5 minutes at :20
	job, err := task.Periodic("periodic_job6_name", sum, 10, 20).EveryMinutes(5).AtSecondInMinute(20)
	if err != nil {
		return
	}

	// task.StartScheduler method will start the scheduler, it will loop to schedule runnable jobs.
	// you can add more jobs to schedule after call task.StartScheduler.
	// here, to show more use cases, start scheduler in a separate goroutine
	go task.StartScheduler()

	// return job's name: periodic_job6_name
	job.Name()

	// return job's running statistics
	jobStats := job.Stats()
	for _, stat := range jobStats {
		fmt.Println(stat.ToMap())
	}

	// return jobs's latest scheduled time, return time.Time{} if not scheduled yet.
	job.GetLatestScheduledTime()

	// return job count in scheduler
	task.JobCount()

	// handle all job stats
	allJobStats := task.JobStats()
	for jobName, jobStats := range allJobStats {
		fmt.Printf("job %s stat:\n", jobName)
		for _, stat := range jobStats {
			fmt.Println(stat.ToMap())
		}
	}
	// remove job by name in scheduler
	task.RemoveJob(job.Name())

	// remove all jobs in scheduler
	task.RemoveAllJobs()

	// Stop scheduler after 5 seconds
	// set argument to false indicates waiting all running jobs finish before return.
	time.Sleep(5 * time.Second)
	task.StopScheduler(false)
}

func sum(a, b int) int {
	return a + b
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrRaceCondition means conflict condition happens when coordinated.
	ErrRaceCondition = errors.New("race condition when coordination")

	// ErrTimeRange means time range error.
	ErrTimeRange = errors.New("time range is invalid")

	// ErrJobTimeout means job's executed time exceeds `jobMaxExecutionDuration`(1 hour).
	ErrJobTimeout = errors.New("job is timeout")

	// ErrJobCronInvalid means job's cron expression is invalid.
	ErrJobCronInvalid = errors.New("job's cron is invalid")

	// ErrNotFunctionType means job's function is not function type.
	ErrNotFunctionType = errors.New("job's function is not function type")
	// ErrFunctionArityNotMatch means function arity(the number of parameters) does not match given arguments
	ErrFunctionArityNotMatch = errors.New("job's function arity does not match given arguments")
)

Functions

func IsCoordinateError added in v1.0.5

func IsCoordinateError(err error) bool

func JobCount

func JobCount() int

JobCount returns the number of jobs in the default scheduler.

func JobStats

func JobStats() map[string][]JobStat

JobStats returns all jobs' statistics in the default scheduler.

func RemoveAllJobs

func RemoveAllJobs()

RemoveAllJobs removes all jobs from the default scheduler.

func RemoveJob

func RemoveJob(name string)

RemoveJob removes a job by name from the default scheduler.

func StartScheduler

func StartScheduler()

StartScheduler starts the default scheduler.

func StopScheduler

func StopScheduler(force bool)

StopScheduler stops the default scheduler.

Types

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator represents a coordinator.

func NewCoordinatorFromRedis

func NewCoordinatorFromRedis(name, address string) *Coordinator

NewCoordinatorFromRedis creates a coordinator based on standalone redis.

func NewCoordinatorFromRedisCluster

func NewCoordinatorFromRedisCluster(name string, addrs []string) *Coordinator

NewCoordinatorFromRedisCluster creates a coordinator based on redis cluster.

type Job

type Job interface {
	Name() string
	Stats() []JobStat
	GetLatestScheduledTime() (time.Time, error)
	IsSchedulable(time.Time) (bool, error)
	// contains filtered or unexported methods
}

Job represents a job

type JobStat

type JobStat struct {
	IsSuccess     bool
	Err           error
	RunDuration   time.Duration
	ScheduledTime time.Time
}

JobStat represents the running statistics of a job.

func (JobStat) ToMap

func (stat JobStat) ToMap() map[string]interface{}

ToMap converts a JobStat struct to a map.

type OnceJob

type OnceJob struct {
	// contains filtered or unexported fields
}

OnceJob represents a job running only once.

func NewOnceJob

func NewOnceJob(name string, function interface{}, params []interface{}) *OnceJob

NewOnceJob creates a OnceJob.

func Once

func Once(name string, function interface{}, params ...interface{}) *OnceJob

Once adds a job to the default scheduler, and the job only run once.

func (*OnceJob) Delay

func (job *OnceJob) Delay(delay time.Duration) *OnceJob

Delay set the delayed duration from now for the current OnceJob.

func (*OnceJob) GetLatestScheduledTime

func (job *OnceJob) GetLatestScheduledTime() (time.Time, error)

GetLatestScheduledTime returns job's latest scheduled time, returns time.Time{} if is not scheduled yet.

func (*OnceJob) Interval added in v1.0.4

func (job *OnceJob) Interval() time.Duration

func (*OnceJob) IsSchedulable added in v1.0.4

func (job *OnceJob) IsSchedulable(t time.Time) (bool, error)

func (*OnceJob) Name

func (job *OnceJob) Name() string

Name returns a job's name.

func (*OnceJob) SetCoordinate

func (job *OnceJob) SetCoordinate(coordinator *Coordinator) *OnceJob

SetCoordinate sets coordinator for the current job.

func (*OnceJob) Stats

func (job *OnceJob) Stats() []JobStat

Stats returns a job's running statistics.

type PeriodicJob

type PeriodicJob struct {
	// contains filtered or unexported fields
}

PeriodicJob represents a job running periodically.

func NewPeriodicJob

func NewPeriodicJob(name string, function interface{}, params []interface{}) *PeriodicJob

NewPeriodicJob creates a PeriodicJob.

func Periodic

func Periodic(name string, function interface{}, params ...interface{}) *PeriodicJob

Periodic add a job to the default scheduler, and the job run periodically.

func (*PeriodicJob) AtHourInDay

func (job *PeriodicJob) AtHourInDay(hour, minute, second int) (*PeriodicJob, error)

AtHourInDay sets hour time for jobs running periodically in days.

func (*PeriodicJob) AtMinuteInHour

func (job *PeriodicJob) AtMinuteInHour(minute, second int) (*PeriodicJob, error)

AtMinuteInHour sets minute time for jobs running periodically in hours.

func (*PeriodicJob) AtSecondInMinute

func (job *PeriodicJob) AtSecondInMinute(second int) (*PeriodicJob, error)

AtSecondInMinute sets second time for jobs running periodically in minutes.

func (*PeriodicJob) EveryDays

func (job *PeriodicJob) EveryDays(day int) *PeriodicJob

EveryDays sets running period in days for the current job.

func (*PeriodicJob) EveryFridays

func (job *PeriodicJob) EveryFridays(week int) *PeriodicJob

EveryFridays sets running period in Fridays for the current job.

func (*PeriodicJob) EveryHours

func (job *PeriodicJob) EveryHours(hour int) *PeriodicJob

EveryHours sets running period in hours for the current job.

func (*PeriodicJob) EveryMinutes

func (job *PeriodicJob) EveryMinutes(minute int) *PeriodicJob

EveryMinutes sets running period in minutes for the current job.

func (*PeriodicJob) EveryMondays

func (job *PeriodicJob) EveryMondays(week int) *PeriodicJob

EveryMondays sets running period in Mondays for the current job.

func (*PeriodicJob) EverySaturdays

func (job *PeriodicJob) EverySaturdays(week int) *PeriodicJob

EverySaturdays sets running period in Saturdays for the current job.

func (*PeriodicJob) EverySeconds

func (job *PeriodicJob) EverySeconds(second int) *PeriodicJob

EverySeconds sets running period in seconds for the current job.

func (*PeriodicJob) EverySundays

func (job *PeriodicJob) EverySundays(week int) *PeriodicJob

EverySundays sets running period in Sundays for the current job.

func (*PeriodicJob) EveryThursdays

func (job *PeriodicJob) EveryThursdays(week int) *PeriodicJob

EveryThursdays sets running period in Thursdays for the current job.

func (*PeriodicJob) EveryTuesdays

func (job *PeriodicJob) EveryTuesdays(week int) *PeriodicJob

EveryTuesdays sets running period in Tuesdays for the current job.

func (*PeriodicJob) EveryWednesdays

func (job *PeriodicJob) EveryWednesdays(week int) *PeriodicJob

EveryWednesdays sets running period in Wednesdays for the current job.

func (*PeriodicJob) GetLatestScheduledTime

func (job *PeriodicJob) GetLatestScheduledTime() (time.Time, error)

GetLatestScheduledTime returns job's latest scheduled time, returns time.Time{} if is not scheduled yet.

func (*PeriodicJob) Interval added in v1.0.4

func (job *PeriodicJob) Interval() time.Duration

func (*PeriodicJob) IsSchedulable added in v1.0.4

func (job *PeriodicJob) IsSchedulable(t time.Time) (bool, error)

func (*PeriodicJob) Name

func (job *PeriodicJob) Name() string

Name returns a job's name.

func (*PeriodicJob) SetCoordinate

func (job *PeriodicJob) SetCoordinate(coordinator *Coordinator) *PeriodicJob

SetCoordinate sets coordinator for the current job.

func (*PeriodicJob) SetTimeZone

func (job *PeriodicJob) SetTimeZone(tz *time.Location) *PeriodicJob

SetTimeZone sets timezone for the current job.

func (*PeriodicJob) Stats

func (job *PeriodicJob) Stats() []JobStat

Stats returns a job's running statistics.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler represents a scheduler.

func NewScheduler

func NewScheduler(workerCount int) Scheduler

NewScheduler create a scheduler, and at most `workerCount` jobs can run concurrently in this scheduler.

func (*Scheduler) AddPeriodicJob

func (scheduler *Scheduler) AddPeriodicJob(name string, function interface{}, params ...interface{}) *PeriodicJob

AddPeriodicJob add a job to the current scheduler, and the job run periodically.

func (*Scheduler) AddRunOnceJob

func (scheduler *Scheduler) AddRunOnceJob(name string, function interface{}, params ...interface{}) *OnceJob

AddRunOnceJob adds a job to the current scheduler, and the job only run once.

func (*Scheduler) JobCount

func (scheduler *Scheduler) JobCount() int

JobCount returns the number of jobs in the current scheduler

func (*Scheduler) JobStats

func (scheduler *Scheduler) JobStats() map[string][]JobStat

JobStats returns all jobs' statistics in the current scheduler.

func (*Scheduler) RemoveAllJobs

func (scheduler *Scheduler) RemoveAllJobs()

RemoveAllJobs removes all jobs from the current scheduler.

func (*Scheduler) RemoveJob

func (scheduler *Scheduler) RemoveJob(name string)

RemoveJob removes a job by name from the current scheduler.

func (*Scheduler) Start

func (scheduler *Scheduler) Start()

Start starts the current scheduler.

func (*Scheduler) Stop

func (scheduler *Scheduler) Stop(force bool)

Stop stops the current scheduler.

func (*Scheduler) StopWithTimeout added in v1.3.1

func (scheduler *Scheduler) StopWithTimeout(timeout time.Duration)

StopWithTimeout stops the current scheduler, waiting running tasks at most `timeout` duration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL