Documentation
¶
Index ¶
- Constants
- Variables
- func AllocVunitSafe(ctx context.Context, cli IAllocVunit, vuid comproto.Vuid, ...) (ret *client.AllocVunitInfo, err error)
- func DataMountFormat(dataMountBytes [counter.SLOT]int) string
- func FormatPrint(statsInfos []ErrorPercent) (res []string)
- func GenTaskID(prefix string, vid comproto.Vid) string
- func InsistOn(ctx context.Context, errMsg string, on func() error)
- func NewCounter(clusterID proto.ClusterID, taskType string, kind string) prometheus.Counter
- func ShouldAllocAndRedo(errCode int) bool
- func Subtraction(a, b []comproto.Vuid) (c []comproto.Vuid)
- type ClusterTopologyStatsMgr
- type Consumer
- type ConsumerPause
- type ErrorPercent
- type ErrorStats
- type GroupConsumer
- type IAllocVunit
- type IProducer
- type KafkaConfig
- type KafkaConsumer
- type KafkaConsumerCfg
- type KafkaConsumerGroup
- type KafkaTopicMonitor
- type Queue
- func (q *Queue) Get(id string) (interface{}, error)
- func (q *Queue) Pop() (string, interface{}, bool)
- func (q *Queue) Push(id string, msg interface{}) error
- func (q *Queue) Remove(id string) error
- func (q *Queue) Requeue(id string, delay time.Duration) error
- func (q *Queue) Stats() (todo, doing int)
- type TaskCntStats
- type TaskCommonConfig
- type TaskQueue
- func (q *TaskQueue) PopTask() (string, WorkerTask, bool)
- func (q *TaskQueue) PushTask(taskID string, task WorkerTask)
- func (q *TaskQueue) Query(taskID string) (WorkerTask, bool)
- func (q *TaskQueue) RemoveTask(taskID string) error
- func (q *TaskQueue) RetryTask(taskID string)
- func (q *TaskQueue) StatsTasks() (todo int, doing int)
- type TaskRunDetailInfo
- type TaskStatsMgr
- func (statsMgr *TaskStatsMgr) CancelTask()
- func (statsMgr *TaskStatsMgr) Counters() (increaseDataSize, increaseShardCnt [counter.SLOT]int)
- func (statsMgr *TaskStatsMgr) QueryTaskDetail(taskID string) (detail TaskRunDetailInfo, err error)
- func (statsMgr *TaskStatsMgr) ReclaimTask()
- func (statsMgr *TaskStatsMgr) ReportTaskCntLoop()
- func (statsMgr *TaskStatsMgr) ReportWorkerTaskStats(taskID string, s proto.TaskStatistics, increaseDataSize, increaseShardCnt int)
- type VolTaskLocker
- type WorkerTask
- type WorkerTaskQueue
- func (q *WorkerTaskQueue) Acquire(idc string) (taskID string, wtask WorkerTask, exist bool)
- func (q *WorkerTaskQueue) AddPreparedTask(idc, taskID string, wtask WorkerTask)
- func (q *WorkerTaskQueue) Cancel(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) error
- func (q *WorkerTaskQueue) Complete(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) (WorkerTask, error)
- func (q *WorkerTaskQueue) Query(idc, taskID string) (WorkerTask, error)
- func (q *WorkerTaskQueue) Reclaim(idc, taskID string, src []proto.VunitLocation, ...) error
- func (q *WorkerTaskQueue) Renewal(idc, taskID string) error
- func (q *WorkerTaskQueue) SetLeaseExpiredS(dura time.Duration)
- func (q *WorkerTaskQueue) StatsTasks() (todo int, doing int)
Constants ¶
const ( KindFailed = "failed" KindSuccess = "success" )
statistics stats
const ( // EmptyDiskID empty diskID EmptyDiskID = proto.DiskID(0) )
Variables ¶
var ( // ErrNoSuchMessageID no such message id ErrNoSuchMessageID = errors.New("no such message id") // ErrUnmatchedVuids unmatched task vuids ErrUnmatchedVuids = errors.New("unmatched task vuids") )
var ( ErrNoTaskInQueue = errors.New("no task in queue") ErrVolNotOnlyOneTask = errors.New("vol not only one task running") ErrUpdateVolumeCache = errors.New("update volume cache failed") )
err use for task
var Buckets = []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}
Buckets default buckets for stats
var ( // ErrVidTaskConflict vid task conflict ErrVidTaskConflict = errors.New("vid task conflict") )
make sure only one task in same volume to run in cluster
var NewVolTaskLockerOnce sync.Once
NewVolTaskLockerOnce singleton mode:make sure only one instance in global
Functions ¶
func AllocVunitSafe ¶
func AllocVunitSafe( ctx context.Context, cli IAllocVunit, vuid comproto.Vuid, volReplicas []comproto.VunitLocation) (ret *client.AllocVunitInfo, err error)
AllocVunitSafe alloc volume unit safe
func DataMountFormat ¶
DataMountFormat format data
func FormatPrint ¶
func FormatPrint(statsInfos []ErrorPercent) (res []string)
FormatPrint format print message
func NewCounter ¶
NewCounter returns statistics counter
func ShouldAllocAndRedo ¶
ShouldAllocAndRedo return true if should alloc and redo task
Types ¶
type ClusterTopologyStatsMgr ¶
type ClusterTopologyStatsMgr struct {
// contains filtered or unexported fields
}
ClusterTopologyStatsMgr cluster topology stats manager
func NewClusterTopologyStatisticsMgr ¶
func NewClusterTopologyStatisticsMgr(clusterID proto.ClusterID, buckets []float64) *ClusterTopologyStatsMgr
NewClusterTopologyStatisticsMgr returns cluster topology stats manager
func (*ClusterTopologyStatsMgr) ReportFreeChunk ¶
func (statsMgr *ClusterTopologyStatsMgr) ReportFreeChunk(disk *api.DiskInfoSimple)
ReportFreeChunk report free chunk
type Consumer ¶
type Consumer struct { ConsumeFn func(msg []*sarama.ConsumerMessage, consumerPause ConsumerPause) bool closer.Closer // contains filtered or unexported fields }
func (*Consumer) Cleanup ¶
func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error
func (*Consumer) ConsumeClaim ¶
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
type ConsumerPause ¶
type ConsumerPause interface {
Done() <-chan struct{}
}
type ErrorPercent ¶
type ErrorPercent struct {
// contains filtered or unexported fields
}
ErrorPercent error percent
type ErrorStats ¶
type ErrorStats struct {
// contains filtered or unexported fields
}
ErrorStats error stats
func (*ErrorStats) Stats ¶
func (es *ErrorStats) Stats() (statsResult []ErrorPercent, totalErrCnt uint64)
Stats returns stats
type GroupConsumer ¶
type GroupConsumer interface {
Stop()
}
type IAllocVunit ¶
type IAllocVunit interface {
AllocVolumeUnit(ctx context.Context, vuid comproto.Vuid) (ret *client.AllocVunitInfo, err error)
}
IAllocVunit define the interface of clustermgr used for volume alloc
type IProducer ¶
type IProducer interface { SendMessage(msg []byte) (err error) SendMessages(msgs [][]byte) (err error) }
IProducer define the interface of producer
func NewMsgSender ¶
func NewMsgSender(cfg *kafka.ProducerCfg) (IProducer, error)
NewMsgSender returns message sender
type KafkaConfig ¶
KafkaConfig kafka config
type KafkaConsumer ¶
type KafkaConsumer interface {
StartKafkaConsumer(cfg KafkaConsumerCfg, fn func(msg []*sarama.ConsumerMessage, consumerPause ConsumerPause) bool) (GroupConsumer, error)
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(brokers []string) KafkaConsumer
type KafkaConsumerCfg ¶
type KafkaConsumerGroup ¶
type KafkaConsumerGroup struct {
// contains filtered or unexported fields
}
func (*KafkaConsumerGroup) Stop ¶
func (cg *KafkaConsumerGroup) Stop()
type KafkaTopicMonitor ¶
KafkaTopicMonitor kafka monitor
func NewKafkaTopicMonitor ¶
func NewKafkaTopicMonitor(taskType proto.TaskType, clusterID proto.ClusterID, cfg *KafkaConfig) (*KafkaTopicMonitor, error)
NewKafkaTopicMonitor returns kafka topic monitor
func (*KafkaTopicMonitor) Close ¶
func (m *KafkaTopicMonitor) Close()
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue task queue
type TaskCntStats ¶
type TaskCntStats interface {
StatQueueTaskCnt() (preparing, workerDoing, finishing int)
}
TaskCntStats information of task running on worker
type TaskCommonConfig ¶
type TaskCommonConfig struct { PrepareQueueRetryDelayS int `json:"prepare_queue_retry_delay_s"` FinishQueueRetryDelayS int `json:"finish_queue_retry_delay_s"` CancelPunishDurationS int `json:"cancel_punish_duration_s"` WorkQueueSize int `json:"work_queue_size"` CollectTaskIntervalS int `json:"collect_task_interval_s"` CheckTaskIntervalS int `json:"check_task_interval_s"` DiskConcurrency int `json:"disk_concurrency"` }
TaskCommonConfig task common config
func (*TaskCommonConfig) CheckAndFix ¶
func (conf *TaskCommonConfig) CheckAndFix()
CheckAndFix check and fix task common config
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue task queue
func NewTaskQueue ¶
NewTaskQueue returns task queue
func (*TaskQueue) PopTask ¶
func (q *TaskQueue) PopTask() (string, WorkerTask, bool)
PopTask return args: taskID, task, flag of task exist
func (*TaskQueue) PushTask ¶
func (q *TaskQueue) PushTask(taskID string, task WorkerTask)
PushTask push task to queue
func (*TaskQueue) Query ¶
func (q *TaskQueue) Query(taskID string) (WorkerTask, bool)
Query find task by taskID
func (*TaskQueue) RemoveTask ¶
RemoveTask remove task by taskID
func (*TaskQueue) StatsTasks ¶
StatsTasks returns task stats
type TaskRunDetailInfo ¶
type TaskRunDetailInfo struct { Statistics proto.TaskStatistics `json:"statistics"` StartTime time.Time `json:"start_time"` CompleteTime time.Time `json:"complete_time"` Completed bool `json:"completed"` }
TaskRunDetailInfo task run detail info
type TaskStatsMgr ¶
type TaskStatsMgr struct { TaskRunInfos map[string]TaskRunDetailInfo // contains filtered or unexported fields }
TaskStatsMgr task stats manager
func NewTaskStatsMgr ¶
func NewTaskStatsMgr(clusterID proto.ClusterID, taskType proto.TaskType) *TaskStatsMgr
NewTaskStatsMgr returns task stats manager
func NewTaskStatsMgrAndRun ¶
func NewTaskStatsMgrAndRun(clusterID proto.ClusterID, taskType proto.TaskType, taskCntStats TaskCntStats) *TaskStatsMgr
NewTaskStatsMgrAndRun run task stats manager
func (*TaskStatsMgr) Counters ¶
func (statsMgr *TaskStatsMgr) Counters() (increaseDataSize, increaseShardCnt [counter.SLOT]int)
Counters returns task stats counters
func (*TaskStatsMgr) QueryTaskDetail ¶
func (statsMgr *TaskStatsMgr) QueryTaskDetail(taskID string) (detail TaskRunDetailInfo, err error)
QueryTaskDetail find task detail info
func (*TaskStatsMgr) ReclaimTask ¶
func (statsMgr *TaskStatsMgr) ReclaimTask()
ReclaimTask reclaim task
func (*TaskStatsMgr) ReportTaskCntLoop ¶
func (statsMgr *TaskStatsMgr) ReportTaskCntLoop()
ReportTaskCntLoop report task count
func (*TaskStatsMgr) ReportWorkerTaskStats ¶
func (statsMgr *TaskStatsMgr) ReportWorkerTaskStats(taskID string, s proto.TaskStatistics, increaseDataSize, increaseShardCnt int)
ReportWorkerTaskStats report worker task stats
type VolTaskLocker ¶
type VolTaskLocker struct {
// contains filtered or unexported fields
}
VolTaskLocker volume task locker
func VolTaskLockerInst ¶
func VolTaskLockerInst() *VolTaskLocker
VolTaskLockerInst ensure that only one background task is executing on the same volume
type WorkerTask ¶
type WorkerTask interface { GetSources() []proto.VunitLocation GetDestination() proto.VunitLocation SetDestination(dest proto.VunitLocation) }
WorkerTask define worker task interface
type WorkerTaskQueue ¶
type WorkerTaskQueue struct {
// contains filtered or unexported fields
}
WorkerTaskQueue task queue for worker
func NewWorkerTaskQueue ¶
func NewWorkerTaskQueue(cancelPunishDuration time.Duration) *WorkerTaskQueue
NewWorkerTaskQueue return worker task queue
func (*WorkerTaskQueue) Acquire ¶
func (q *WorkerTaskQueue) Acquire(idc string) (taskID string, wtask WorkerTask, exist bool)
Acquire acquire task by idc
func (*WorkerTaskQueue) AddPreparedTask ¶
func (q *WorkerTaskQueue) AddPreparedTask(idc, taskID string, wtask WorkerTask)
AddPreparedTask add prepared task
func (*WorkerTaskQueue) Cancel ¶
func (q *WorkerTaskQueue) Cancel(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) error
Cancel cancel task
func (*WorkerTaskQueue) Complete ¶
func (q *WorkerTaskQueue) Complete(idc, taskID string, src []proto.VunitLocation, dst proto.VunitLocation) (WorkerTask, error)
Complete complete task
func (*WorkerTaskQueue) Query ¶
func (q *WorkerTaskQueue) Query(idc, taskID string) (WorkerTask, error)
Query find task by idc and taskID
func (*WorkerTaskQueue) Reclaim ¶
func (q *WorkerTaskQueue) Reclaim(idc, taskID string, src []proto.VunitLocation, oldDest, newDest proto.VunitLocation, newDiskID proto.DiskID) error
Reclaim reclaim task
func (*WorkerTaskQueue) Renewal ¶
func (q *WorkerTaskQueue) Renewal(idc, taskID string) error
Renewal renewal task
func (*WorkerTaskQueue) SetLeaseExpiredS ¶
func (q *WorkerTaskQueue) SetLeaseExpiredS(dura time.Duration)
SetLeaseExpiredS set lease expired time
func (*WorkerTaskQueue) StatsTasks ¶
func (q *WorkerTaskQueue) StatsTasks() (todo int, doing int)
StatsTasks returns task stats