Documentation
¶
Index ¶
- Constants
- Variables
- func AllShardsCanNotDownload(shardDownloadFail error) bool
- func BidsSplit(ctx context.Context, bids []*ShardInfoSimple, taskletSize int) ([]Tasklet, *WorkError)
- func GenMigrateBids(ctx context.Context, blobnodeCli client.IBlobNode, srcReplicas Vunits, ...) (migBids, benchmarkBids []*ShardInfoSimple, wErr *WorkError)
- func GetBids(shardMetas []*ShardInfoSimple) []proto.BlobID
- func GetReplicasBids(ctx context.Context, cli client.IBlobNode, replicas Vunits) map[proto.Vuid]*ReplicaBidsRet
- func NewHandler(service *Service) *rpc.Router
- func ShouldReclaim(e *WorkError) bool
- type Config
- type DataInspectConf
- type DataInspectMgr
- type DataInspectStat
- type ITaskWorker
- type InspectTaskMgr
- type MigrateTaskEx
- type MigrateWorker
- func (w *MigrateWorker) Check(ctx context.Context) *WorkError
- func (w *MigrateWorker) ExecTasklet(ctx context.Context, tasklet Tasklet) *WorkError
- func (w *MigrateWorker) GenTasklets(ctx context.Context) ([]Tasklet, *WorkError)
- func (w *MigrateWorker) GetBenchmarkBids() []*ShardInfoSimple
- func (w *MigrateWorker) OperateArgs() scheduler.OperateTaskArgs
- func (w *MigrateWorker) TaskType() (taskType proto.TaskType)
- type ReplicaBidsRet
- type Service
- func (s *Service) ChunkCompact(c *rpc.Context)
- func (s *Service) ChunkCreate(c *rpc.Context)
- func (s *Service) ChunkInspect(c *rpc.Context)
- func (s *Service) ChunkList(c *rpc.Context)
- func (s *Service) ChunkReadonly(c *rpc.Context)
- func (s *Service) ChunkReadwrite(c *rpc.Context)
- func (s *Service) ChunkRelease(c *rpc.Context)
- func (s *Service) ChunkStat(c *rpc.Context)
- func (s *Service) Close()
- func (s *Service) ConfigReload(c *rpc.Context)
- func (s *Service) DebugStat(c *rpc.Context)
- func (s *Service) DiskProbe(c *rpc.Context)
- func (s *Service) DiskStat(c *rpc.Context)
- func (s *Service) GcRubbishChunk()
- func (s *Service) GetInspectStat(c *rpc.Context)
- func (s *Service) SetInspectRate(c *rpc.Context)
- func (s *Service) ShardDelete(c *rpc.Context)
- func (s *Service) ShardGet(c *rpc.Context)
- func (s *Service) ShardList(c *rpc.Context)
- func (s *Service) ShardMarkdelete(c *rpc.Context)
- func (s *Service) ShardPut(c *rpc.Context)
- func (s *Service) ShardStat(c *rpc.Context)
- func (s *Service) Stat(c *rpc.Context)
- type ShardInfoEx
- type ShardInfoSimple
- type ShardInfoWithCrc
- type ShardRecover
- type ShardRepairer
- type ShardsBuf
- func (shards *ShardsBuf) FetchShard(bid proto.BlobID) ([]byte, error)
- func (shards *ShardsBuf) PlanningDataLayout(bids []*ShardInfoSimple) error
- func (shards *ShardsBuf) PutShard(bid proto.BlobID, input io.Reader) error
- func (shards *ShardsBuf) ShardCrc32(bid proto.BlobID) (crc uint32, err error)
- func (shards *ShardsBuf) ShardSizeIsZero(bid proto.BlobID) bool
- type TaskRunner
- type TaskRunnerMgr
- func (tm *TaskRunnerMgr) AddTask(ctx context.Context, task MigrateTaskEx) error
- func (tm *TaskRunnerMgr) GetAliveTasks() map[proto.TaskType][]string
- func (tm *TaskRunnerMgr) RenewalTaskLoop(stopCh <-chan struct{})
- func (tm *TaskRunnerMgr) RunningTaskCnt() map[proto.TaskType]int
- func (tm *TaskRunnerMgr) StopAllAliveRunner()
- func (tm *TaskRunnerMgr) TaskStats() blobnode.WorkerStats
- type Tasklet
- type Vunits
- type WokeErrorType
- type WorkError
- func CheckVunit(ctx context.Context, expectBids []*ShardInfoSimple, dest proto.VunitLocation, ...) *WorkError
- func DstError(err error) *WorkError
- func MigrateBids(ctx context.Context, shardRecover *ShardRecover, badIdx uint8, ...) *WorkError
- func OtherError(err error) *WorkError
- func SrcError(err error) *WorkError
- type WorkerConfig
- type WorkerConfigMeter
- type WorkerGenerator
- type WorkerService
Constants ¶
const ( DefaultHeartbeatIntervalSec = 30 // 30 s DefaultChunkReportIntervalSec = 60 // 1 min DefaultCleanExpiredStatIntervalSec = 60 * 60 // 60 min DefaultChunkGcIntervalSec = 30 * 60 // 30 min DefaultChunkInspectIntervalSec = 24 * 60 * 60 // 24 hour DefaultChunkProtectionPeriodSec = 48 * 60 * 60 // 48 hour DefaultDiskStatusCheckIntervalSec = 2 * 60 // 2 min DefaultDeleteQpsLimitPerDisk = 128 DefaultInspectRate = 4 * 1024 * 1024 // rate limit 4MB per second )
const ( TickInterval = 1 HeartbeatTicks = 30 ExpiresTicks = 60 )
const ( TaskInit uint8 = iota + 1 TaskRunning TaskStopping TaskSuccess TaskStopped )
task runner status
const (
DefaultShutdownTimeout = 60 * time.Second // 60 s
)
const (
ShardListPageLimit = 65536
)
const (
// StatusInterrupt interrupt error status
StatusInterrupt = 596
)
Variables ¶
var ( ErrNotSupportKey = errors.New("not support this key") ErrValueType = errors.New("value type not match this key") ErrValueOutOfLimit = errors.New("value out of limit") )
var ( ErrNotEnoughWellReplicaCnt = errors.New("well replicas cnt is not enough") ErrNotEnoughBidsInTasklet = errors.New("check len of tasklet and bids is not equal") ErrTaskletSizeInvalid = errors.New("tasklet size is invalid") ErrBidSizeOverTaskletSize = errors.New("bid size is over tasklet size") ErrUnexpected = errors.New("unexpected error when get bench bids") )
var ( // ErrBidMissing bid is missing ErrBidMissing = errors.New("bid is missing") // ErrBidNotMatch bid not match ErrBidNotMatch = errors.New("bid not match") )
var ErrNotReadyForMigrate = errors.New("not ready for migrate")
ErrNotReadyForMigrate not ready for migrate
Functions ¶
func AllShardsCanNotDownload ¶
AllShardsCanNotDownload judge whether all shards can download or not accord by download error
func BidsSplit ¶
func BidsSplit(ctx context.Context, bids []*ShardInfoSimple, taskletSize int) ([]Tasklet, *WorkError)
BidsSplit split bids list to many tasklets by taskletSize
func GenMigrateBids ¶
func GenMigrateBids(ctx context.Context, blobnodeCli client.IBlobNode, srcReplicas Vunits, dst proto.VunitLocation, mode codemode.CodeMode, badIdxs []uint8, ) (migBids, benchmarkBids []*ShardInfoSimple, wErr *WorkError)
GenMigrateBids generates migrate blob ids
func GetReplicasBids ¶
func GetReplicasBids(ctx context.Context, cli client.IBlobNode, replicas Vunits) map[proto.Vuid]*ReplicaBidsRet
GetReplicasBids returns replicas bids info
func NewHandler ¶
func ShouldReclaim ¶
ShouldReclaim returns true if the task should reclaim
Types ¶
type Config ¶
type Config struct { cmd.Config core.HostInfo WorkerConfig Disks []core.Config `json:"disks"` DiskConfig core.RuntimeConfig `json:"disk_config"` MetaConfig db.MetaConfig `json:"meta_config"` FlockFilename string `json:"flock_filename"` Clustermgr *cmapi.Config `json:"clustermgr"` HeartbeatIntervalSec int `json:"heartbeat_interval_S"` ChunkReportIntervalSec int `json:"chunk_report_interval_S"` ChunkGcIntervalSec int `json:"chunk_gc_interval_S"` ChunkProtectionPeriodSec int `json:"chunk_protection_period_S"` CleanExpiredStatIntervalSec int `json:"clean_expired_stat_interval_S"` DiskStatusCheckIntervalSec int `json:"disk_status_check_interval_S"` DeleteQpsLimitPerDisk int `json:"delete_qps_limit_per_disk"` InspectConf DataInspectConf `json:"inspect_conf"` }
type DataInspectConf ¶
type DataInspectMgr ¶
type DataInspectMgr struct {
// contains filtered or unexported fields
}
func NewDataInspectMgr ¶
func NewDataInspectMgr(svr *Service, conf DataInspectConf, switchMgr *taskswitch.SwitchMgr) (*DataInspectMgr, error)
type DataInspectStat ¶
type DataInspectStat struct { DataInspectConf Open bool `json:"open"` }
type ITaskWorker ¶
type ITaskWorker interface { // split tasklets accord by volume benchmark bids GenTasklets(ctx context.Context) ([]Tasklet, *WorkError) // define tasklet execution operator ,eg:disk repair & migrate ExecTasklet(ctx context.Context, t Tasklet) *WorkError // check whether the task is executed successfully when volume task finish Check(ctx context.Context) *WorkError OperateArgs() scheduler.OperateTaskArgs TaskType() (taskType proto.TaskType) GetBenchmarkBids() []*ShardInfoSimple }
ITaskWorker define interface used for task execution
func NewMigrateWorker ¶
func NewMigrateWorker(task MigrateTaskEx) ITaskWorker
NewMigrateWorker returns migrate worker
type InspectTaskMgr ¶
type InspectTaskMgr struct {
// contains filtered or unexported fields
}
InspectTaskMgr inspect task manager
func NewInspectTaskMgr ¶
func NewInspectTaskMgr(concurrency int, bidGetter client.IBlobNode, reporter scheduler.IInspector) *InspectTaskMgr
NewInspectTaskMgr returns inspect task manager
func (*InspectTaskMgr) AddTask ¶
func (mgr *InspectTaskMgr) AddTask(ctx context.Context, task *proto.VolumeInspectTask) error
AddTask adds inspect task
func (*InspectTaskMgr) RunningTaskSize ¶
func (mgr *InspectTaskMgr) RunningTaskSize() int
RunningTaskSize returns running inspect task size
type MigrateTaskEx ¶
type MigrateTaskEx struct {
// contains filtered or unexported fields
}
MigrateTaskEx migrate task execution machine
type MigrateWorker ¶
type MigrateWorker struct {
// contains filtered or unexported fields
}
MigrateWorker used to manager migrate task
func (*MigrateWorker) Check ¶
func (w *MigrateWorker) Check(ctx context.Context) *WorkError
Check checks migrate task execute result
func (*MigrateWorker) ExecTasklet ¶
func (w *MigrateWorker) ExecTasklet(ctx context.Context, tasklet Tasklet) *WorkError
ExecTasklet execute migrate tasklet
func (*MigrateWorker) GenTasklets ¶
func (w *MigrateWorker) GenTasklets(ctx context.Context) ([]Tasklet, *WorkError)
GenTasklets generates migrate tasklets
func (*MigrateWorker) GetBenchmarkBids ¶
func (w *MigrateWorker) GetBenchmarkBids() []*ShardInfoSimple
GetBenchmarkBids returns benchmark bids
func (*MigrateWorker) OperateArgs ¶
func (w *MigrateWorker) OperateArgs() scheduler.OperateTaskArgs
OperateArgs args for cancel, complete, reclaim.
func (*MigrateWorker) TaskType ¶
func (w *MigrateWorker) TaskType() (taskType proto.TaskType)
TaskType returns task type
type ReplicaBidsRet ¶
ReplicaBidsRet with bids info and error message
type Service ¶
type Service struct { Disks map[proto.DiskID]core.DiskAPI WorkerService *WorkerService // client handler ClusterMgrClient *cmapi.Client Conf *Config // limiter DeleteQpsLimitPerKey limit.Limiter DeleteQpsLimitPerDisk limit.ResettableLimiter ChunkLimitPerVuid limit.Limiter DiskLimitPerKey limit.Limiter InspectLimiterPerKey limit.Limiter RequestCount int64 // contains filtered or unexported fields }
func NewService ¶
func (*Service) ChunkCompact ¶
* method: POST * url: /chunk/compact/diskid/{diskid}/vuid/{vuid} * request body: json.Marshal(CompactChunkArgs)
func (*Service) ChunkCreate ¶
* method: POST * url: /chunk/create/diskid/{diskid}/vuid/{vuid}?chunksize={chunksize} * request body: json.Marshal(bnapi.ChunkCreateArgs)
func (*Service) ChunkInspect ¶
* method: POST * url: /chunk/inspect/diskid/{diskid}/vuid/{vuid}
func (*Service) ChunkReadonly ¶
* method: POST * url: /chunk/readonly/diskid/{diskid}/vuid/{vuid}/ * request body: json.Marshal(ChunkArgs)
func (*Service) ChunkReadwrite ¶
* method: POST * url: /chunk/readwrite/diskid/{diskid}/vuid/{vuid} * request body: json.Marshal(ChunkArgs)
func (*Service) ChunkRelease ¶
* method: POST * url: /chunk/release/diskid/{diskid}/vuid/{vuid} * request body: json.Marshal(ChunkArgs)
func (*Service) ChunkStat ¶
* method: GET * url: /chunk/stat/diskid/{diskid}/vuid/{vuid} * response body: json.Marshal(ChunkInfo)
func (*Service) ConfigReload ¶
key:disk_bandwidth_MBPS,disk_iops,level0.bandwidth_MBPS,level1.iops ...
func (*Service) DebugStat ¶
* method: GET * url: /debug/stat * response body: json.Marshal([]DiskInfo)
func (*Service) DiskProbe ¶
* method: POST * url: /disk/probe * request body: json.Marshal(DiskProbeArgs)
func (*Service) DiskStat ¶
* method: GET * url: /disk/stat/diskid/{diskid} * response body: json.Marshal(DiskInfo)
func (*Service) GcRubbishChunk ¶
func (s *Service) GcRubbishChunk()
func (*Service) GetInspectStat ¶
func (*Service) SetInspectRate ¶
func (*Service) ShardDelete ¶
* method: POST * url: /shard/delete/diskid/{diskid}/vuid/{vuid}/bid/{bid} * request body: json.Marshal(deleteArgs)
func (*Service) ShardGet ¶
* method: GET * url: /shard/get/diskid/{diskid}/vuid/{vuid}/bid/{bid}?iotype={iotype} * response body: bidData
func (*Service) ShardList ¶
* method: GET * url: /shard/list/diskid/{diskid}/vuid/{vuid}/startbid/{bid}/status/{status}/count/{count} * response body: Marshal([]*bnapi.ShardInfo)
func (*Service) ShardMarkdelete ¶
* method: POST * url: /shard/markdelete/diskid/{diskid}/vuid/{vuid}/bid/{bid} * request body: json.Marshal(deleteArgs)
func (*Service) ShardPut ¶
* method: POST * url: /shard/put/diskid/{diskid}/vuid/{vuid}/bid/{bid}/size/{size}?iotype={iotype} * request body: bidData
type ShardInfoEx ¶
type ShardInfoEx struct {
// contains filtered or unexported fields
}
ShardInfoEx shard info execution
func (*ShardInfoEx) BadReason ¶
func (shard *ShardInfoEx) BadReason() error
BadReason returns bad reason
func (*ShardInfoEx) IsBad ¶
func (shard *ShardInfoEx) IsBad() bool
IsBad returns true is shard return error
func (*ShardInfoEx) MarkDeleted ¶
func (shard *ShardInfoEx) MarkDeleted() bool
MarkDeleted returns true if no err return and shard is mark deleted
func (*ShardInfoEx) Normal ¶
func (shard *ShardInfoEx) Normal() bool
Normal returns true if no err return and shard status is normal
func (*ShardInfoEx) NotExist ¶
func (shard *ShardInfoEx) NotExist() bool
NotExist returns true if no err return and shard is not exist
func (*ShardInfoEx) ShardSize ¶
func (shard *ShardInfoEx) ShardSize() int64
ShardSize returns shard size
type ShardInfoSimple ¶
ShardInfoSimple with blob id and size
func GetBenchmarkBids ¶
func GetBenchmarkBids(ctx context.Context, cli client.IBlobNode, replicas Vunits, mode codemode.CodeMode, badIdxs []uint8) (bids []*ShardInfoSimple, err error)
GetBenchmarkBids returns bench mark bids
func MergeBids ¶
func MergeBids(replicasBids map[proto.Vuid]*ReplicaBidsRet) []*ShardInfoSimple
MergeBids merge bids
type ShardInfoWithCrc ¶
ShardInfoWithCrc with blob id and size and crc
func GetSingleVunitNormalBids ¶
func GetSingleVunitNormalBids(ctx context.Context, cli client.IBlobNode, replica proto.VunitLocation) (bids []*ShardInfoWithCrc, err error)
GetSingleVunitNormalBids returns single volume unit bids info
type ShardRecover ¶
type ShardRecover struct {
// contains filtered or unexported fields
}
ShardRecover used to recover shard data
func NewShardRecover ¶
func NewShardRecover(replicas Vunits, mode codemode.CodeMode, bidInfos []*ShardInfoSimple, shardGetter client.IBlobNode, vunitShardGetConcurrency int, taskType proto.TaskType, ) *ShardRecover
NewShardRecover returns shard recover
func (*ShardRecover) RecoverShards ¶
RecoverShards recover shards
func (*ShardRecover) ReleaseBuf ¶
func (r *ShardRecover) ReleaseBuf()
ReleaseBuf release chunks shards buffer
type ShardRepairer ¶
type ShardRepairer struct {
// contains filtered or unexported fields
}
ShardRepairer used to repair shard data
func NewShardRepairer ¶
func NewShardRepairer(cli client.IBlobNode) *ShardRepairer
NewShardRepairer returns shard repairer
func (*ShardRepairer) RepairShard ¶
func (repairer *ShardRepairer) RepairShard(ctx context.Context, task *proto.ShardRepairTask) error
RepairShard repair shard data
type ShardsBuf ¶
type ShardsBuf struct {
// contains filtered or unexported fields
}
ShardsBuf used to store shard data in memory
func (*ShardsBuf) FetchShard ¶
FetchShard returns shard data
func (*ShardsBuf) PlanningDataLayout ¶
func (shards *ShardsBuf) PlanningDataLayout(bids []*ShardInfoSimple) error
PlanningDataLayout planning data layout
func (*ShardsBuf) ShardCrc32 ¶
ShardCrc32 returns shard crc32
type TaskRunner ¶
type TaskRunner struct {
// contains filtered or unexported fields
}
TaskRunner used to manage task
func NewTaskRunner ¶
func NewTaskRunner(ctx context.Context, taskID string, w ITaskWorker, idc string, taskletRunConcurrency int, taskCounter *taskCounter, schedulerCli scheduler.IMigrator) *TaskRunner
NewTaskRunner return task runner
func (*TaskRunner) Stopped ¶
func (r *TaskRunner) Stopped() bool
Stopped returns true if task is stopped
type TaskRunnerMgr ¶
type TaskRunnerMgr struct {
// contains filtered or unexported fields
}
TaskRunnerMgr task runner manager
func NewTaskRunnerMgr ¶
func NewTaskRunnerMgr(idc string, meter WorkerConfigMeter, genWorker WorkerGenerator, renewalCli, schedulerCli scheduler.IMigrator) *TaskRunnerMgr
NewTaskRunnerMgr returns task runner manager
func (*TaskRunnerMgr) AddTask ¶
func (tm *TaskRunnerMgr) AddTask(ctx context.Context, task MigrateTaskEx) error
AddTask add migrate task.
func (*TaskRunnerMgr) GetAliveTasks ¶
func (tm *TaskRunnerMgr) GetAliveTasks() map[proto.TaskType][]string
GetAliveTasks returns all alive migrate task.
func (*TaskRunnerMgr) RenewalTaskLoop ¶
func (tm *TaskRunnerMgr) RenewalTaskLoop(stopCh <-chan struct{})
RenewalTaskLoop renewal task.
func (*TaskRunnerMgr) RunningTaskCnt ¶
func (tm *TaskRunnerMgr) RunningTaskCnt() map[proto.TaskType]int
RunningTaskCnt return running task count
func (*TaskRunnerMgr) StopAllAliveRunner ¶
func (tm *TaskRunnerMgr) StopAllAliveRunner()
StopAllAliveRunner stops all alive runner
func (*TaskRunnerMgr) TaskStats ¶
func (tm *TaskRunnerMgr) TaskStats() blobnode.WorkerStats
TaskStats task counter result.
type Tasklet ¶
type Tasklet struct {
// contains filtered or unexported fields
}
Tasklet is the smallest unit of task exe
func (*Tasklet) DataSizeByte ¶
DataSizeByte returns total bids size
type Vunits ¶
type Vunits []proto.VunitLocation
Vunits volume stripe locations.
func (Vunits) IntactGlobalSet ¶
type WokeErrorType ¶
type WokeErrorType uint8
WokeErrorType worker error type
const ( DstErr WokeErrorType = iota + 1 SrcErr OtherErr )
task runner error type
type WorkError ¶
type WorkError struct {
// contains filtered or unexported fields
}
WorkError with error type and error
func CheckVunit ¶
func CheckVunit(ctx context.Context, expectBids []*ShardInfoSimple, dest proto.VunitLocation, blobnodeCli client.IBlobNode) *WorkError
CheckVunit checks volume unit info
func MigrateBids ¶
func MigrateBids(ctx context.Context, shardRecover *ShardRecover, badIdx uint8, destLocation proto.VunitLocation, direct bool, bids []*ShardInfoSimple, blobnodeCli client.IBlobNode) *WorkError
MigrateBids migrate the bids data to destination
type WorkerConfig ¶
type WorkerConfig struct { WorkerConfigMeter // buffer pool use for migrate and repair shard repair BufPoolConf base.BufConfig `json:"buf_pool_conf"` // acquire task period AcquireIntervalMs int `json:"acquire_interval_ms"` // scheduler client config Scheduler scheduler.Config `json:"scheduler"` // blbonode client config BlobNode bnapi.Config `json:"blobnode"` DroppedBidRecord *recordlog.Config `json:"dropped_bid_record"` }
WorkerConfig worker service config
type WorkerConfigMeter ¶
type WorkerConfigMeter struct { // max task run count of disk repair & balance & disk drop MaxTaskRunnerCnt int `json:"max_task_runner_cnt"` // tasklet concurrency of single repair task RepairConcurrency int `json:"repair_concurrency"` // tasklet concurrency of single balance task BalanceConcurrency int `json:"balance_concurrency"` // tasklet concurrency of single disk drop task DiskDropConcurrency int `json:"disk_drop_concurrency"` // tasklet concurrency of single manual migrate task ManualMigrateConcurrency int `json:"manual_migrate_concurrency"` // shard repair concurrency ShardRepairConcurrency int `json:"shard_repair_concurrency"` // volume inspect concurrency InspectConcurrency int `json:"inspect_concurrency"` // batch download concurrency of single tasklet DownloadShardConcurrency int `json:"download_shard_concurrency"` }
WorkerConfigMeter worker controller meter.
type WorkerGenerator ¶
type WorkerGenerator = func(task MigrateTaskEx) ITaskWorker
WorkerGenerator generates task worker.
type WorkerService ¶
type WorkerService struct { closer.Closer WorkerConfig // contains filtered or unexported fields }
WorkerService worker worker_service
func NewWorkerService ¶
func NewWorkerService(cfg *WorkerConfig, service cmapi.APIService, clusterID proto.ClusterID, idc string) (*WorkerService, error)
NewWorkerService returns rpc worker_service
func (*WorkerService) ShardRepair ¶
func (s *WorkerService) ShardRepair(c *rpc.Context)
ShardRepair repair shard
func (*WorkerService) WorkerStats ¶
func (s *WorkerService) WorkerStats(c *rpc.Context)
WorkerStats returns worker_service stats
Source Files
¶
- chunk.go
- chunkcheck.go
- chunkreport.go
- compact.go
- config.go
- datainspect.go
- disk.go
- heartbeat.go
- iostat.go
- service.go
- shard.go
- startup.go
- svr.go
- task_inspect_mgr.go
- task_runner.go
- task_runner_mgr.go
- task_runner_migrate.go
- task_shard_repair.go
- work_shard_getter.go
- work_shard_recover.go
- work_shards_migrate.go
- worker_service.go