最佳实践
本文档提供使用 PyPI Crawler 的最佳实践建议,帮助您构建高效、稳定的应用程序。
📋 目录
性能优化
1. 选择合适的镜像源
go
// 根据地理位置选择镜像源
func selectOptimalMirror() api.PyPIClient {
// 中国大陆用户
if isInChina() {
return mirrors.NewTsinghuaClient()
}
// 其他地区用户
return mirrors.NewOfficialClient()
}
func isInChina() bool {
// 简单的地区检测逻辑
// 实际应用中可以使用更精确的方法
return strings.Contains(os.Getenv("TZ"), "Asia/Shanghai")
}2. 复用客户端实例
go
// ❌ 错误做法:每次创建新客户端
func badExample() {
for _, pkg := range packages {
client := mirrors.NewTsinghuaClient() // 每次都创建新客户端
info, _ := client.GetPackageInfo(ctx, pkg)
// ...
}
}
// ✅ 正确做法:复用客户端
func goodExample() {
client := mirrors.NewTsinghuaClient() // 创建一次
for _, pkg := range packages {
info, _ := client.GetPackageInfo(ctx, pkg)
// ...
}
}3. 合理设置超时时间
go
func createOptimizedClient() api.PyPIClient {
options := client.NewOptions()
// 根据操作类型设置不同的超时时间
switch operationType {
case "single_query":
options.WithTimeout(10 * time.Second)
case "batch_operation":
options.WithTimeout(5 * time.Minute)
case "full_index":
options.WithTimeout(10 * time.Minute)
}
return mirrors.NewTsinghuaClient(options)
}4. 使用连接池
go
type ClientPool struct {
clients chan api.PyPIClient
factory func() api.PyPIClient
}
func NewClientPool(size int, factory func() api.PyPIClient) *ClientPool {
pool := &ClientPool{
clients: make(chan api.PyPIClient, size),
factory: factory,
}
// 预填充连接池
for i := 0; i < size; i++ {
pool.clients <- factory()
}
return pool
}
func (p *ClientPool) Get() api.PyPIClient {
select {
case client := <-p.clients:
return client
default:
return p.factory()
}
}
func (p *ClientPool) Put(client api.PyPIClient) {
select {
case p.clients <- client:
default:
// 池已满,丢弃客户端
}
}错误处理策略
1. 分层错误处理
go
type PyPIError struct {
Type string
Message string
Cause error
Retry bool
}
func (e *PyPIError) Error() string {
return fmt.Sprintf("[%s] %s", e.Type, e.Message)
}
func classifyError(err error) *PyPIError {
errStr := err.Error()
switch {
case strings.Contains(errStr, "404"):
return &PyPIError{
Type: "NOT_FOUND",
Message: "包不存在",
Cause: err,
Retry: false,
}
case strings.Contains(errStr, "timeout"):
return &PyPIError{
Type: "TIMEOUT",
Message: "请求超时",
Cause: err,
Retry: true,
}
case strings.Contains(errStr, "5"):
return &PyPIError{
Type: "SERVER_ERROR",
Message: "服务器错误",
Cause: err,
Retry: true,
}
default:
return &PyPIError{
Type: "UNKNOWN",
Message: "未知错误",
Cause: err,
Retry: false,
}
}
}2. 智能重试机制
go
type RetryConfig struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
func retryWithBackoff(fn func() error, config RetryConfig) error {
var lastErr error
delay := config.BaseDelay
for attempt := 0; attempt < config.MaxAttempts; attempt++ {
if attempt > 0 {
time.Sleep(delay)
// 指数退避
delay = time.Duration(float64(delay) * config.Multiplier)
if delay > config.MaxDelay {
delay = config.MaxDelay
}
}
if err := fn(); err == nil {
return nil
} else {
lastErr = err
// 检查是否应该重试
if pypiErr := classifyError(err); !pypiErr.Retry {
break
}
}
}
return fmt.Errorf("重试 %d 次后失败: %w", config.MaxAttempts, lastErr)
}
// 使用示例
func getPackageWithRetry(client api.PyPIClient, packageName string) (*models.Package, error) {
var result *models.Package
err := retryWithBackoff(func() error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pkg, err := client.GetPackageInfo(ctx, packageName)
if err != nil {
return err
}
result = pkg
return nil
}, RetryConfig{
MaxAttempts: 3,
BaseDelay: 1 * time.Second,
MaxDelay: 10 * time.Second,
Multiplier: 2.0,
})
return result, err
}并发控制
1. 限制并发数
go
type ConcurrencyLimiter struct {
semaphore chan struct{}
}
func NewConcurrencyLimiter(limit int) *ConcurrencyLimiter {
return &ConcurrencyLimiter{
semaphore: make(chan struct{}, limit),
}
}
func (cl *ConcurrencyLimiter) Acquire() {
cl.semaphore <- struct{}{}
}
func (cl *ConcurrencyLimiter) Release() {
<-cl.semaphore
}
func (cl *ConcurrencyLimiter) Do(fn func()) {
cl.Acquire()
defer cl.Release()
fn()
}
// 使用示例
func batchProcessPackages(client api.PyPIClient, packages []string) {
limiter := NewConcurrencyLimiter(5) // 最多5个并发
var wg sync.WaitGroup
for _, pkg := range packages {
wg.Add(1)
go func(packageName string) {
defer wg.Done()
limiter.Do(func() {
processPackage(client, packageName)
})
}(pkg)
}
wg.Wait()
}2. 速率限制
go
type RateLimiter struct {
ticker *time.Ticker
requests chan struct{}
}
func NewRateLimiter(requestsPerSecond int) *RateLimiter {
rl := &RateLimiter{
ticker: time.NewTicker(time.Second / time.Duration(requestsPerSecond)),
requests: make(chan struct{}, requestsPerSecond),
}
// 填充初始令牌
for i := 0; i < requestsPerSecond; i++ {
rl.requests <- struct{}{}
}
// 定期补充令牌
go func() {
for range rl.ticker.C {
select {
case rl.requests <- struct{}{}:
default:
// 令牌桶已满
}
}
}()
return rl
}
func (rl *RateLimiter) Wait() {
<-rl.requests
}
func (rl *RateLimiter) Close() {
rl.ticker.Stop()
}缓存策略
1. 内存缓存
go
type MemoryCache struct {
data map[string]*CacheItem
mutex sync.RWMutex
maxAge time.Duration
}
type CacheItem struct {
Value interface{}
ExpiresAt time.Time
}
func NewMemoryCache(maxAge time.Duration) *MemoryCache {
cache := &MemoryCache{
data: make(map[string]*CacheItem),
maxAge: maxAge,
}
// 定期清理过期项
go cache.cleanup()
return cache
}
func (mc *MemoryCache) Get(key string) (interface{}, bool) {
mc.mutex.RLock()
defer mc.mutex.RUnlock()
item, exists := mc.data[key]
if !exists || time.Now().After(item.ExpiresAt) {
return nil, false
}
return item.Value, true
}
func (mc *MemoryCache) Set(key string, value interface{}) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
mc.data[key] = &CacheItem{
Value: value,
ExpiresAt: time.Now().Add(mc.maxAge),
}
}
func (mc *MemoryCache) cleanup() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
mc.mutex.Lock()
now := time.Now()
for key, item := range mc.data {
if now.After(item.ExpiresAt) {
delete(mc.data, key)
}
}
mc.mutex.Unlock()
}
}2. 缓存装饰器
go
type CachedClient struct {
client api.PyPIClient
cache *MemoryCache
}
func NewCachedClient(client api.PyPIClient, cacheMaxAge time.Duration) *CachedClient {
return &CachedClient{
client: client,
cache: NewMemoryCache(cacheMaxAge),
}
}
func (cc *CachedClient) GetPackageInfo(ctx context.Context, packageName string) (*models.Package, error) {
cacheKey := fmt.Sprintf("package:%s", packageName)
// 尝试从缓存获取
if cached, found := cc.cache.Get(cacheKey); found {
return cached.(*models.Package), nil
}
// 缓存未命中,从API获取
pkg, err := cc.client.GetPackageInfo(ctx, packageName)
if err != nil {
return nil, err
}
// 存入缓存
cc.cache.Set(cacheKey, pkg)
return pkg, nil
}监控和日志
1. 结构化日志
go
import "log/slog"
type LoggedClient struct {
client api.PyPIClient
logger *slog.Logger
}
func NewLoggedClient(client api.PyPIClient, logger *slog.Logger) *LoggedClient {
return &LoggedClient{
client: client,
logger: logger,
}
}
func (lc *LoggedClient) GetPackageInfo(ctx context.Context, packageName string) (*models.Package, error) {
start := time.Now()
lc.logger.Info("开始获取包信息",
slog.String("package", packageName),
slog.String("operation", "GetPackageInfo"))
pkg, err := lc.client.GetPackageInfo(ctx, packageName)
duration := time.Since(start)
if err != nil {
lc.logger.Error("获取包信息失败",
slog.String("package", packageName),
slog.String("error", err.Error()),
slog.Duration("duration", duration))
return nil, err
}
lc.logger.Info("成功获取包信息",
slog.String("package", packageName),
slog.String("version", pkg.Info.Version),
slog.Duration("duration", duration))
return pkg, nil
}2. 性能指标收集
go
type Metrics struct {
RequestCount int64
ErrorCount int64
TotalDuration time.Duration
mutex sync.RWMutex
}
func (m *Metrics) RecordRequest(duration time.Duration, err error) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.RequestCount++
m.TotalDuration += duration
if err != nil {
m.ErrorCount++
}
}
func (m *Metrics) GetStats() (requests int64, errors int64, avgDuration time.Duration) {
m.mutex.RLock()
defer m.mutex.RUnlock()
requests = m.RequestCount
errors = m.ErrorCount
if requests > 0 {
avgDuration = m.TotalDuration / time.Duration(requests)
}
return
}
type MetricsClient struct {
client api.PyPIClient
metrics *Metrics
}
func NewMetricsClient(client api.PyPIClient) *MetricsClient {
return &MetricsClient{
client: client,
metrics: &Metrics{},
}
}
func (mc *MetricsClient) GetPackageInfo(ctx context.Context, packageName string) (*models.Package, error) {
start := time.Now()
pkg, err := mc.client.GetPackageInfo(ctx, packageName)
duration := time.Since(start)
mc.metrics.RecordRequest(duration, err)
return pkg, err
}安全考虑
1. 输入验证
go
func validatePackageName(name string) error {
if name == "" {
return fmt.Errorf("包名不能为空")
}
if len(name) > 214 {
return fmt.Errorf("包名过长")
}
// PyPI包名规则验证
matched, _ := regexp.MatchString(`^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$`, name)
if !matched {
return fmt.Errorf("包名格式无效")
}
return nil
}
func safeGetPackageInfo(client api.PyPIClient, ctx context.Context, packageName string) (*models.Package, error) {
if err := validatePackageName(packageName); err != nil {
return nil, fmt.Errorf("输入验证失败: %w", err)
}
return client.GetPackageInfo(ctx, packageName)
}2. 敏感信息处理
go
func sanitizeUserAgent(userAgent string) string {
// 移除可能的敏感信息
re := regexp.MustCompile(`\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b`) // IP地址
userAgent = re.ReplaceAllString(userAgent, "[IP]")
re = regexp.MustCompile(`[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`) // 邮箱
userAgent = re.ReplaceAllString(userAgent, "[EMAIL]")
return userAgent
}生产环境部署
1. 配置管理
go
type Config struct {
Mirror string `env:"PYPI_MIRROR" default:"tsinghua"`
Timeout time.Duration `env:"PYPI_TIMEOUT" default:"30s"`
MaxRetries int `env:"PYPI_MAX_RETRIES" default:"3"`
Concurrency int `env:"PYPI_CONCURRENCY" default:"5"`
CacheMaxAge time.Duration `env:"PYPI_CACHE_MAX_AGE" default:"1h"`
LogLevel string `env:"LOG_LEVEL" default:"info"`
}
func LoadConfig() (*Config, error) {
var config Config
// 使用环境变量加载配置
if err := env.Parse(&config); err != nil {
return nil, err
}
return &config, nil
}
func CreateClientFromConfig(config *Config) api.PyPIClient {
options := client.NewOptions().
WithTimeout(config.Timeout).
WithMaxRetries(config.MaxRetries)
switch config.Mirror {
case "tsinghua":
return mirrors.NewTsinghuaClient(options)
case "aliyun":
return mirrors.NewAliyunClient(options)
default:
return mirrors.NewOfficialClient(options)
}
}2. 健康检查
go
func HealthCheck(client api.PyPIClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 尝试获取一个知名包的信息
_, err := client.GetPackageInfo(ctx, "requests")
return err
}
func StartHealthCheckServer(client api.PyPIClient, port int) {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if err := HealthCheck(client); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "Health check failed: %v", err)
return
}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "OK")
})
log.Printf("Health check server starting on port %d", port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
}3. 优雅关闭
go
func GracefulShutdown(cancel context.CancelFunc) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("收到关闭信号,开始优雅关闭...")
cancel()
}()
}下一步: 查看 常见问题 了解常见问题的解决方案。