Skip to content

高级用法示例

本页面展示 NPM Crawler 的高级使用场景和技巧。

示例 1: 缓存实现

go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/scagogogo/npm-crawler/pkg/models"
    "github.com/scagogogo/npm-crawler/pkg/registry"
)

// 包信息缓存
type PackageCache struct {
    cache map[string]*CacheEntry
    mutex sync.RWMutex
}

type CacheEntry struct {
    Package   *models.Package
    Timestamp time.Time
    TTL       time.Duration
}

func NewPackageCache() *PackageCache {
    return &PackageCache{
        cache: make(map[string]*CacheEntry),
    }
}

func (c *PackageCache) Get(packageName string) (*models.Package, bool) {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    
    entry, exists := c.cache[packageName]
    if !exists {
        return nil, false
    }
    
    // 检查是否过期
    if time.Since(entry.Timestamp) > entry.TTL {
        return nil, false
    }
    
    return entry.Package, true
}

func (c *PackageCache) Set(packageName string, pkg *models.Package, ttl time.Duration) {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    
    c.cache[packageName] = &CacheEntry{
        Package:   pkg,
        Timestamp: time.Now(),
        TTL:       ttl,
    }
}

// 带缓存的客户端
type CachedClient struct {
    client *registry.Registry
    cache  *PackageCache
}

func NewCachedClient() *CachedClient {
    return &CachedClient{
        client: registry.NewRegistry(),
        cache:  NewPackageCache(),
    }
}

func (cc *CachedClient) GetPackageInformation(ctx context.Context, packageName string) (*models.Package, error) {
    // 先检查缓存
    if pkg, hit := cc.cache.Get(packageName); hit {
        fmt.Printf("缓存命中: %s\n", packageName)
        return pkg, nil
    }
    
    // 缓存未命中,从网络获取
    fmt.Printf("从网络获取: %s\n", packageName)
    pkg, err := cc.client.GetPackageInformation(ctx, packageName)
    if err != nil {
        return nil, err
    }
    
    // 存入缓存,TTL 为 10 分钟
    cc.cache.Set(packageName, pkg, 10*time.Minute)
    
    return pkg, nil
}

func main() {
    client := NewCachedClient()
    ctx := context.Background()
    
    packages := []string{"react", "vue", "react", "angular", "react"}
    
    for _, pkg := range packages {
        info, err := client.GetPackageInformation(ctx, pkg)
        if err != nil {
            fmt.Printf("错误: %v\n", err)
            continue
        }
        
        fmt.Printf("包: %s, 版本: %s\n", info.Name, info.DistTags["latest"])
    }
}

示例 2: 重试机制

go
package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "math"
    "time"

    "github.com/scagogogo/npm-crawler/pkg/models"
    "github.com/scagogogo/npm-crawler/pkg/registry"
)

// 重试配置
type RetryConfig struct {
    MaxRetries int
    BaseDelay  time.Duration
    MaxDelay   time.Duration
    Multiplier float64
}

// 默认重试配置
var DefaultRetryConfig = RetryConfig{
    MaxRetries: 3,
    BaseDelay:  1 * time.Second,
    MaxDelay:   30 * time.Second,
    Multiplier: 2.0,
}

// 带重试的客户端
type RetryClient struct {
    client *registry.Registry
    config RetryConfig
}

func NewRetryClient(config RetryConfig) *RetryClient {
    return &RetryClient{
        client: registry.NewRegistry(),
        config: config,
    }
}

// 指数退避重试
func (rc *RetryClient) GetPackageInformation(ctx context.Context, packageName string) (*models.Package, error) {
    var lastErr error
    
    for attempt := 0; attempt <= rc.config.MaxRetries; attempt++ {
        if attempt > 0 {
            delay := rc.calculateDelay(attempt)
            fmt.Printf("第 %d 次重试 %s,等待 %v\n", attempt, packageName, delay)
            
            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return nil, ctx.Err()
            }
        }
        
        pkg, err := rc.client.GetPackageInformation(ctx, packageName)
        if err == nil {
            if attempt > 0 {
                fmt.Printf("重试成功: %s\n", packageName)
            }
            return pkg, nil
        }
        
        lastErr = err
        
        // 某些错误不需要重试
        if isNonRetryableError(err) {
            break
        }
    }
    
    return nil, fmt.Errorf("重试 %d 次后仍然失败: %w", rc.config.MaxRetries, lastErr)
}

func (rc *RetryClient) calculateDelay(attempt int) time.Duration {
    delay := time.Duration(float64(rc.config.BaseDelay) * math.Pow(rc.config.Multiplier, float64(attempt-1)))
    
    if delay > rc.config.MaxDelay {
        delay = rc.config.MaxDelay
    }
    
    return delay
}

func isNonRetryableError(err error) bool {
    // 这里可以根据错误类型判断是否需要重试
    if errors.Is(err, context.Canceled) {
        return true
    }
    
    // 可以添加更多不需要重试的错误类型
    return false
}

func main() {
    client := NewRetryClient(DefaultRetryConfig)
    ctx := context.Background()
    
    // 测试正常包
    pkg, err := client.GetPackageInformation(ctx, "react")
    if err != nil {
        log.Printf("获取 react 失败: %v", err)
    } else {
        fmt.Printf("成功获取: %s v%s\n", pkg.Name, pkg.DistTags["latest"])
    }
    
    // 测试不存在的包(会触发重试)
    _, err = client.GetPackageInformation(ctx, "nonexistent-package-12345")
    if err != nil {
        log.Printf("获取不存在的包失败(预期): %v", err)
    }
}

示例 3: 连接池和性能优化

go
package main

import (
    "context"
    "fmt"
    "net/http"
    "sync"
    "time"

    "github.com/scagogogo/npm-crawler/pkg/registry"
)

// 连接池管理器
type ConnectionManager struct {
    clients map[string]*registry.Registry
    mutex   sync.RWMutex
}

func NewConnectionManager() *ConnectionManager {
    return &ConnectionManager{
        clients: make(map[string]*registry.Registry),
    }
}

func (cm *ConnectionManager) GetClient(registryURL string) *registry.Registry {
    cm.mutex.RLock()
    client, exists := cm.clients[registryURL]
    cm.mutex.RUnlock()
    
    if exists {
        return client
    }
    
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    // 双重检查
    if client, exists := cm.clients[registryURL]; exists {
        return client
    }
    
    // 创建新客户端
    options := registry.NewOptions().SetRegistryURL(registryURL)
    client = registry.NewRegistry(options)
    cm.clients[registryURL] = client
    
    return client
}

// 负载均衡客户端
type LoadBalancedClient struct {
    clients []string
    manager *ConnectionManager
    current int
    mutex   sync.Mutex
}

func NewLoadBalancedClient(registryURLs []string) *LoadBalancedClient {
    return &LoadBalancedClient{
        clients: registryURLs,
        manager: NewConnectionManager(),
    }
}

func (lbc *LoadBalancedClient) getNextClient() *registry.Registry {
    lbc.mutex.Lock()
    url := lbc.clients[lbc.current]
    lbc.current = (lbc.current + 1) % len(lbc.clients)
    lbc.mutex.Unlock()
    
    return lbc.manager.GetClient(url)
}

func (lbc *LoadBalancedClient) GetPackageInformation(ctx context.Context, packageName string) error {
    client := lbc.getNextClient()
    
    pkg, err := client.GetPackageInformation(ctx, packageName)
    if err != nil {
        return err
    }
    
    fmt.Printf("从 %s 获取: %s v%s\n", 
        client.GetOptions().RegistryURL, 
        pkg.Name, 
        pkg.DistTags["latest"])
    
    return nil
}

func main() {
    // 配置多个镜像源进行负载均衡
    registryURLs := []string{
        "https://registry.npmjs.org",
        "https://registry.npmmirror.com",
        "https://registry.npm.taobao.org",
    }
    
    client := NewLoadBalancedClient(registryURLs)
    ctx := context.Background()
    
    packages := []string{"react", "vue", "angular", "lodash", "express", "axios"}
    
    var wg sync.WaitGroup
    for _, pkg := range packages {
        wg.Add(1)
        go func(packageName string) {
            defer wg.Done()
            
            err := client.GetPackageInformation(ctx, packageName)
            if err != nil {
                fmt.Printf("获取 %s 失败: %v\n", packageName, err)
            }
        }(pkg)
    }
    
    wg.Wait()
}

示例 4: 限流器

go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/scagogogo/npm-crawler/pkg/registry"
    "golang.org/x/time/rate"
)

// 限流客户端
type RateLimitedClient struct {
    client  *registry.Registry
    limiter *rate.Limiter
}

func NewRateLimitedClient(requestsPerSecond int) *RateLimitedClient {
    return &RateLimitedClient{
        client:  registry.NewRegistry(),
        limiter: rate.NewLimiter(rate.Limit(requestsPerSecond), requestsPerSecond),
    }
}

func (rlc *RateLimitedClient) GetPackageInformation(ctx context.Context, packageName string) error {
    // 等待令牌
    if err := rlc.limiter.Wait(ctx); err != nil {
        return err
    }
    
    pkg, err := rlc.client.GetPackageInformation(ctx, packageName)
    if err != nil {
        return err
    }
    
    fmt.Printf("✅ %s: v%s\n", pkg.Name, pkg.DistTags["latest"])
    return nil
}

func main() {
    // 限制每秒 2 个请求
    client := NewRateLimitedClient(2)
    ctx := context.Background()
    
    packages := []string{
        "react", "vue", "angular", "lodash", "express",
        "axios", "moment", "underscore", "jquery", "bootstrap",
    }
    
    start := time.Now()
    
    var wg sync.WaitGroup
    for i, pkg := range packages {
        wg.Add(1)
        go func(index int, packageName string) {
            defer wg.Done()
            
            fmt.Printf("[%d] 开始请求: %s\n", index, packageName)
            err := client.GetPackageInformation(ctx, packageName)
            if err != nil {
                fmt.Printf("❌ [%d] %s: %v\n", index, packageName, err)
            }
        }(i, pkg)
    }
    
    wg.Wait()
    
    fmt.Printf("\n总耗时: %v\n", time.Since(start))
}

示例 5: 健康检查和故障转移

go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/scagogogo/npm-crawler/pkg/registry"
)

// 镜像源状态
type MirrorStatus struct {
    URL       string
    Healthy   bool
    LastCheck time.Time
    Latency   time.Duration
}

// 健康检查管理器
type HealthChecker struct {
    mirrors map[string]*MirrorStatus
    mutex   sync.RWMutex
}

func NewHealthChecker(urls []string) *HealthChecker {
    hc := &HealthChecker{
        mirrors: make(map[string]*MirrorStatus),
    }
    
    for _, url := range urls {
        hc.mirrors[url] = &MirrorStatus{
            URL:     url,
            Healthy: true,
        }
    }
    
    return hc
}

func (hc *HealthChecker) CheckHealth(ctx context.Context) {
    var wg sync.WaitGroup
    
    for url := range hc.mirrors {
        wg.Add(1)
        go func(mirrorURL string) {
            defer wg.Done()
            hc.checkSingleMirror(ctx, mirrorURL)
        }(url)
    }
    
    wg.Wait()
}

func (hc *HealthChecker) checkSingleMirror(ctx context.Context, url string) {
    start := time.Now()
    
    options := registry.NewOptions().SetRegistryURL(url)
    client := registry.NewRegistry(options)
    
    // 使用简单的包查询来检查健康状态
    _, err := client.GetPackageInformation(ctx, "lodash")
    
    hc.mutex.Lock()
    defer hc.mutex.Unlock()
    
    status := hc.mirrors[url]
    status.LastCheck = time.Now()
    status.Latency = time.Since(start)
    status.Healthy = (err == nil)
    
    if status.Healthy {
        fmt.Printf("✅ %s: 健康 (延迟: %v)\n", url, status.Latency)
    } else {
        fmt.Printf("❌ %s: 不健康 - %v\n", url, err)
    }
}

func (hc *HealthChecker) GetHealthyMirrors() []string {
    hc.mutex.RLock()
    defer hc.mutex.RUnlock()
    
    var healthy []string
    for url, status := range hc.mirrors {
        if status.Healthy {
            healthy = append(healthy, url)
        }
    }
    
    return healthy
}

func (hc *HealthChecker) GetBestMirror() string {
    hc.mutex.RLock()
    defer hc.mutex.RUnlock()
    
    var best string
    var bestLatency time.Duration = time.Hour // 初始值设置得很大
    
    for url, status := range hc.mirrors {
        if status.Healthy && status.Latency < bestLatency {
            best = url
            bestLatency = status.Latency
        }
    }
    
    return best
}

// 自适应客户端
type AdaptiveClient struct {
    checker *HealthChecker
    clients map[string]*registry.Registry
    mutex   sync.RWMutex
}

func NewAdaptiveClient(urls []string) *AdaptiveClient {
    return &AdaptiveClient{
        checker: NewHealthChecker(urls),
        clients: make(map[string]*registry.Registry),
    }
}

func (ac *AdaptiveClient) getClient(url string) *registry.Registry {
    ac.mutex.RLock()
    client, exists := ac.clients[url]
    ac.mutex.RUnlock()
    
    if exists {
        return client
    }
    
    ac.mutex.Lock()
    defer ac.mutex.Unlock()
    
    if client, exists := ac.clients[url]; exists {
        return client
    }
    
    options := registry.NewOptions().SetRegistryURL(url)
    client = registry.NewRegistry(options)
    ac.clients[url] = client
    
    return client
}

func (ac *AdaptiveClient) GetPackageInformation(ctx context.Context, packageName string) error {
    // 定期健康检查
    go ac.checker.CheckHealth(ctx)
    
    // 获取最佳镜像
    bestMirror := ac.checker.GetBestMirror()
    if bestMirror == "" {
        return fmt.Errorf("没有可用的健康镜像")
    }
    
    client := ac.getClient(bestMirror)
    pkg, err := client.GetPackageInformation(ctx, packageName)
    if err != nil {
        return err
    }
    
    fmt.Printf("使用镜像 %s 获取: %s v%s\n", 
        bestMirror, 
        pkg.Name, 
        pkg.DistTags["latest"])
    
    return nil
}

func main() {
    mirrors := []string{
        "https://registry.npmjs.org",
        "https://registry.npmmirror.com",
        "https://registry.npm.taobao.org",
        "https://mirrors.huaweicloud.com/repository/npm",
    }
    
    client := NewAdaptiveClient(mirrors)
    ctx := context.Background()
    
    // 先进行健康检查
    fmt.Println("进行健康检查...")
    client.checker.CheckHealth(ctx)
    
    fmt.Println("\n开始获取包信息...")
    packages := []string{"react", "vue", "angular", "lodash", "express"}
    
    for _, pkg := range packages {
        err := client.GetPackageInformation(ctx, pkg)
        if err != nil {
            fmt.Printf("获取 %s 失败: %v\n", pkg, err)
        }
        
        time.Sleep(1 * time.Second) // 避免请求过快
    }
}

示例 6: 监控和指标收集

go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/scagogogo/npm-crawler/pkg/registry"
)

// 指标收集器
type Metrics struct {
    TotalRequests     int64
    SuccessfulRequests int64
    FailedRequests    int64
    TotalLatency      time.Duration
    mutex             sync.RWMutex
}

func NewMetrics() *Metrics {
    return &Metrics{}
}

func (m *Metrics) RecordRequest(latency time.Duration, success bool) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    
    m.TotalRequests++
    m.TotalLatency += latency
    
    if success {
        m.SuccessfulRequests++
    } else {
        m.FailedRequests++
    }
}

func (m *Metrics) GetStats() (total, success, failed int64, avgLatency time.Duration) {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    
    total = m.TotalRequests
    success = m.SuccessfulRequests
    failed = m.FailedRequests
    
    if total > 0 {
        avgLatency = m.TotalLatency / time.Duration(total)
    }
    
    return
}

func (m *Metrics) GetSuccessRate() float64 {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    
    if m.TotalRequests == 0 {
        return 0
    }
    
    return float64(m.SuccessfulRequests) / float64(m.TotalRequests) * 100
}

// 监控客户端
type MonitoredClient struct {
    client  *registry.Registry
    metrics *Metrics
}

func NewMonitoredClient() *MonitoredClient {
    return &MonitoredClient{
        client:  registry.NewRegistry(),
        metrics: NewMetrics(),
    }
}

func (mc *MonitoredClient) GetPackageInformation(ctx context.Context, packageName string) error {
    start := time.Now()
    
    pkg, err := mc.client.GetPackageInformation(ctx, packageName)
    latency := time.Since(start)
    
    success := err == nil
    mc.metrics.RecordRequest(latency, success)
    
    if success {
        fmt.Printf("✅ %s: v%s (耗时: %v)\n", 
            pkg.Name, 
            pkg.DistTags["latest"], 
            latency)
    } else {
        fmt.Printf("❌ %s: %v (耗时: %v)\n", 
            packageName, 
            err, 
            latency)
    }
    
    return err
}

func (mc *MonitoredClient) PrintStats() {
    total, success, failed, avgLatency := mc.metrics.GetStats()
    successRate := mc.metrics.GetSuccessRate()
    
    fmt.Printf("\n=== 统计信息 ===\n")
    fmt.Printf("总请求数: %d\n", total)
    fmt.Printf("成功请求: %d\n", success)
    fmt.Printf("失败请求: %d\n", failed)
    fmt.Printf("成功率: %.2f%%\n", successRate)
    fmt.Printf("平均延迟: %v\n", avgLatency)
}

// 定期打印统计信息
func (mc *MonitoredClient) StartMonitoring(interval time.Duration) {
    ticker := time.NewTicker(interval)
    go func() {
        for range ticker.C {
            mc.PrintStats()
        }
    }()
}

func main() {
    client := NewMonitoredClient()
    ctx := context.Background()
    
    // 启动监控
    client.StartMonitoring(10 * time.Second)
    
    packages := []string{
        "react", "vue", "angular", "lodash", "express",
        "axios", "moment", "underscore", "jquery", "bootstrap",
        "nonexistent-package-1", "nonexistent-package-2", // 故意添加不存在的包
    }
    
    var wg sync.WaitGroup
    
    for _, pkg := range packages {
        wg.Add(1)
        go func(packageName string) {
            defer wg.Done()
            client.GetPackageInformation(ctx, packageName)
        }(pkg)
        
        // 稍微延迟,避免过快请求
        time.Sleep(500 * time.Millisecond)
    }
    
    wg.Wait()
    
    // 最终统计
    client.PrintStats()
}

运行高级示例

这些高级示例需要额外的依赖:

bash
go mod init npm-crawler-advanced
go get github.com/scagogogo/npm-crawler
go get golang.org/x/time/rate
go get golang.org/x/sync/semaphore
go run example.go

下一步

Released under the MIT License.