dispatch

package
v1.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package dispatch contains logic to dispatch requests locally or to other nodes.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddResponseMetadata added in v1.12.0

func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta)

AddResponseMetadata adds the metadata found in the incoming metadata to the existing metadata, *modifying it in place*.

func CheckDepth

func CheckDepth(ctx context.Context, req DispatchableRequest) error

CheckDepth returns ErrMaxDepth if there is insufficient depth remaining to dispatch.

func NewMaxDepthExceededError added in v1.23.0

func NewMaxDepthExceededError(req DispatchableRequest) error

NewMaxDepthExceededError creates a new MaxDepthExceededError.

Types

type Check

type Check interface {
	// DispatchCheck submits a single check request and returns its result.
	DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error)
}

Check interface describes just the methods required to dispatch check requests.

type CollectingDispatchStream added in v1.8.0

type CollectingDispatchStream[T any] struct {
	// contains filtered or unexported fields
}

CollectingDispatchStream is a dispatch stream that collects results in memory.

func NewCollectingDispatchStream added in v1.8.0

func NewCollectingDispatchStream[T any](ctx context.Context) *CollectingDispatchStream[T]

NewCollectingDispatchStream creates a new CollectingDispatchStream.

func (*CollectingDispatchStream[T]) Context added in v1.8.0

func (s *CollectingDispatchStream[T]) Context() context.Context

func (*CollectingDispatchStream[T]) Publish added in v1.8.0

func (s *CollectingDispatchStream[T]) Publish(result T) error

func (*CollectingDispatchStream[T]) Results added in v1.8.0

func (s *CollectingDispatchStream[T]) Results() []T

type CountingDispatchStream added in v1.22.0

type CountingDispatchStream[T any] struct {
	Stream Stream[T]
	// contains filtered or unexported fields
}

CountingDispatchStream is a dispatch stream that counts the number of items published. It uses an internal atomic int to ensure it is thread safe.

func NewCountingDispatchStream added in v1.22.0

func NewCountingDispatchStream[T any](wrapped Stream[T]) *CountingDispatchStream[T]

func (*CountingDispatchStream[T]) Context added in v1.22.0

func (s *CountingDispatchStream[T]) Context() context.Context

func (*CountingDispatchStream[T]) Publish added in v1.22.0

func (s *CountingDispatchStream[T]) Publish(result T) error

func (*CountingDispatchStream[T]) PublishedCount added in v1.22.0

func (s *CountingDispatchStream[T]) PublishedCount() uint64

type DispatchableRequest added in v1.23.0

type DispatchableRequest interface {
	zerolog.LogObjectMarshaler

	GetMetadata() *v1.ResolverMeta
}

DispatchableRequest is an interface for requests.

type Dispatcher

type Dispatcher interface {
	Check
	Expand
	LookupSubjects
	LookupResources2

	// Close closes the dispatcher.
	Close() error

	// ReadyState returns true when dispatcher is able to respond to requests
	ReadyState() ReadyState
}

Dispatcher interface describes a method for passing subchecks off to additional machines.

type Expand

type Expand interface {
	// DispatchExpand submits a single expand request and returns its result.
	// If an error is returned, DispatchExpandResponse will still contain Metadata.
	DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
}

Expand interface describes just the methods required to dispatch expand requests.

type HandlingDispatchStream added in v1.12.0

type HandlingDispatchStream[T any] struct {
	// contains filtered or unexported fields
}

HandlingDispatchStream is a dispatch stream that executes a handler for each item published. It uses an internal mutex to ensure it is thread safe.

func (*HandlingDispatchStream[T]) Context added in v1.12.0

func (s *HandlingDispatchStream[T]) Context() context.Context

func (*HandlingDispatchStream[T]) Publish added in v1.12.0

func (s *HandlingDispatchStream[T]) Publish(result T) error

type LookupResources2 added in v1.35.0

type LookupResources2 interface {
	DispatchLookupResources2(
		req *v1.DispatchLookupResources2Request,
		stream LookupResources2Stream,
	) error
}

type LookupResources2Stream added in v1.35.0

type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response]

type LookupSubjects added in v1.12.0

type LookupSubjects interface {
	// DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream.
	DispatchLookupSubjects(
		req *v1.DispatchLookupSubjectsRequest,
		stream LookupSubjectsStream,
	) error
}

LookupSubjects interface describes just the methods required to dispatch lookup subjects requests.

type LookupSubjectsStream added in v1.12.0

type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse]

LookupSubjectsStream is an alias for the stream to which found subjects will be written.

type MaxDepthExceededError added in v1.23.0

type MaxDepthExceededError struct {

	// Request is the request that exceeded the maximum depth.
	Request DispatchableRequest
	// contains filtered or unexported fields
}

MaxDepthExceededError is an error returned when the maximum depth for dispatching has been exceeded.

func (MaxDepthExceededError) GRPCStatus added in v1.35.3

func (err MaxDepthExceededError) GRPCStatus() *status.Status

GRPCStatus implements retrieving the gRPC status for the error.

type ReadyState added in v1.18.1

type ReadyState struct {
	// Message is a human-readable status message for the current state.
	Message string

	// IsReady indicates whether the datastore is ready.
	IsReady bool
}

ReadyState represents the ready state of the dispatcher.

type Stream added in v1.8.0

type Stream[T any] interface {
	// Publish publishes the result to the stream.
	Publish(T) error

	// Context returns the context for the stream.
	Context() context.Context
}

Stream defines the interface generically matching a streaming dispatch response.

func NewHandlingDispatchStream added in v1.12.0

func NewHandlingDispatchStream[T any](ctx context.Context, processor func(result T) error) Stream[T]

NewHandlingDispatchStream returns a new handling dispatch stream.

func StreamWithContext added in v1.8.0

func StreamWithContext[T any](context context.Context, stream Stream[T]) Stream[T]

StreamWithContext returns the given dispatch stream, wrapped to return the given context.

func WrapGRPCStream added in v1.8.0

func WrapGRPCStream[R any, S grpcStream[R]](grpcStream S) Stream[R]

WrapGRPCStream wraps a gRPC result stream with a concurrent-safe dispatch stream. This is necessary because gRPC response streams are *not concurrent safe*. See: https://groups.google.com/g/grpc-io/c/aI6L6M4fzQ0?pli=1

type WrappedDispatchStream added in v1.8.0

type WrappedDispatchStream[T any] struct {
	Stream    Stream[T]
	Ctx       context.Context
	Processor func(result T) (T, bool, error)
}

WrappedDispatchStream is a dispatch stream that wraps another dispatch stream, and performs an operation on each result before puppeting back up to the parent stream.

func (*WrappedDispatchStream[T]) Context added in v1.8.0

func (s *WrappedDispatchStream[T]) Context() context.Context

func (*WrappedDispatchStream[T]) Publish added in v1.8.0

func (s *WrappedDispatchStream[T]) Publish(result T) error

Directories

Path Synopsis
Package combined implements a dispatcher that combines caching, redispatching and optional cluster dispatching.
Package combined implements a dispatcher that combines caching, redispatching and optional cluster dispatching.
Code generated by github.com/ecordell/optgen.
Code generated by github.com/ecordell/optgen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL