Skip to content

包管理示例

本页面提供了详细的包管理示例,展示如何使用 Go Pip SDK 进行各种包操作。

基本包操作

安装单个包

go
package main

import (
    "fmt"
    "log"
    "github.com/scagogogo/go-pip-sdk/pkg/pip"
)

func installSinglePackage() {
    manager := pip.NewManager(nil)
    
    // 基本安装
    pkg := &pip.PackageSpec{
        Name: "requests",
    }
    
    fmt.Printf("正在安装 %s...\n", pkg.Name)
    if err := manager.InstallPackage(pkg); err != nil {
        log.Fatalf("安装失败: %v", err)
    }
    
    fmt.Println("✅ 包安装成功!")
}

安装指定版本的包

go
func installSpecificVersion() {
    manager := pip.NewManager(nil)
    
    packages := []*pip.PackageSpec{
        // 精确版本
        {Name: "django", Version: "4.2.0"},
        
        // 版本范围
        {Name: "numpy", Version: ">=1.20.0,<2.0.0"},
        
        // 最小版本
        {Name: "requests", Version: ">=2.25.0"},
        
        // 兼容版本
        {Name: "click", Version: "~=7.1.0"},
    }
    
    for _, pkg := range packages {
        fmt.Printf("安装 %s %s...\n", pkg.Name, pkg.Version)
        if err := manager.InstallPackage(pkg); err != nil {
            fmt.Printf("❌ 安装 %s 失败: %v\n", pkg.Name, err)
            continue
        }
        fmt.Printf("✅ %s 安装成功\n", pkg.Name)
    }
}

安装带额外依赖的包

go
func installWithExtras() {
    manager := pip.NewManager(nil)
    
    // 安装 FastAPI 及其开发依赖
    pkg := &pip.PackageSpec{
        Name:   "fastapi",
        Extras: []string{"dev", "test"},
    }
    
    if err := manager.InstallPackage(pkg); err != nil {
        log.Fatalf("安装 FastAPI 失败: %v", err)
    }
    
    fmt.Println("✅ FastAPI 及开发依赖安装成功")
}

高级安装选项

可编辑安装

go
func editableInstall() {
    manager := pip.NewManager(nil)
    
    // 可编辑安装本地包
    pkg := &pip.PackageSpec{
        Name:     "./my-local-package",
        Editable: true,
    }
    
    if err := manager.InstallPackage(pkg); err != nil {
        log.Fatalf("可编辑安装失败: %v", err)
    }
    
    fmt.Println("✅ 本地包可编辑安装成功")
}

从 Git 仓库安装

go
func installFromGit() {
    manager := pip.NewManager(nil)
    
    packages := []*pip.PackageSpec{
        // 从 GitHub 安装
        {Name: "git+https://github.com/user/repo.git"},
        
        // 指定分支
        {Name: "git+https://github.com/user/repo.git@develop"},
        
        // 指定标签
        {Name: "git+https://github.com/user/repo.git@v1.0.0"},
        
        // 指定提交
        {Name: "git+https://github.com/user/repo.git@abc123"},
    }
    
    for _, pkg := range packages {
        fmt.Printf("从 Git 安装 %s...\n", pkg.Name)
        if err := manager.InstallPackage(pkg); err != nil {
            fmt.Printf("❌ 安装失败: %v\n", err)
            continue
        }
        fmt.Printf("✅ 安装成功\n")
    }
}

用户级安装

go
func userInstall() {
    manager := pip.NewManager(nil)
    
    // 用户级安装(避免权限问题)
    pkg := &pip.PackageSpec{
        Name:        "jupyter",
        UserInstall: true,
    }
    
    if err := manager.InstallPackage(pkg); err != nil {
        log.Fatalf("用户级安装失败: %v", err)
    }
    
    fmt.Println("✅ Jupyter 用户级安装成功")
}

批量包操作

批量安装

go
func batchInstall() {
    manager := pip.NewManager(nil)
    
    packages := []*pip.PackageSpec{
        {Name: "requests", Version: ">=2.25.0"},
        {Name: "click", Version: ">=7.0"},
        {Name: "pydantic", Version: ">=1.8.0"},
        {Name: "fastapi", Version: ">=0.68.0"},
        {Name: "uvicorn", Version: ">=0.15.0"},
    }
    
    fmt.Printf("开始批量安装 %d 个包...\n", len(packages))
    
    var successful []string
    var failed []string
    
    for _, pkg := range packages {
        fmt.Printf("正在安装 %s...\n", pkg.Name)
        if err := manager.InstallPackage(pkg); err != nil {
            failed = append(failed, pkg.Name)
            fmt.Printf("❌ %s 安装失败: %v\n", pkg.Name, err)
        } else {
            successful = append(successful, pkg.Name)
            fmt.Printf("✅ %s 安装成功\n", pkg.Name)
        }
    }
    
    fmt.Printf("\n安装完成: %d 成功, %d 失败\n", len(successful), len(failed))
    if len(failed) > 0 {
        fmt.Printf("失败的包: %v\n", failed)
    }
}

并发安装

go
func concurrentInstall() {
    manager := pip.NewManager(nil)
    
    packages := []*pip.PackageSpec{
        {Name: "requests"},
        {Name: "click"},
        {Name: "pydantic"},
        {Name: "fastapi"},
        {Name: "uvicorn"},
    }
    
    var wg sync.WaitGroup
    errChan := make(chan error, len(packages))
    semaphore := make(chan struct{}, 3) // 限制并发数为 3
    
    for _, pkg := range packages {
        wg.Add(1)
        go func(p *pip.PackageSpec) {
            defer wg.Done()
            
            // 获取信号量
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            fmt.Printf("正在安装 %s...\n", p.Name)
            if err := manager.InstallPackage(p); err != nil {
                errChan <- fmt.Errorf("安装 %s 失败: %w", p.Name, err)
            } else {
                fmt.Printf("✅ %s 安装成功\n", p.Name)
            }
        }(pkg)
    }
    
    wg.Wait()
    close(errChan)
    
    var errors []error
    for err := range errChan {
        errors = append(errors, err)
    }
    
    if len(errors) > 0 {
        fmt.Printf("并发安装中有 %d 个包失败:\n", len(errors))
        for _, err := range errors {
            fmt.Printf("- %v\n", err)
        }
    } else {
        fmt.Println("所有包并发安装成功!")
    }
}

包查询和信息

列出已安装的包

go
func listInstalledPackages() {
    manager := pip.NewManager(nil)
    
    packages, err := manager.ListPackages()
    if err != nil {
        log.Fatalf("列出包失败: %v", err)
    }
    
    fmt.Printf("已安装的包 (%d 个):\n", len(packages))
    fmt.Println("名称\t\t版本\t\t位置")
    fmt.Println("----\t\t----\t\t----")
    
    for _, pkg := range packages {
        fmt.Printf("%-20s %-15s %s\n", pkg.Name, pkg.Version, pkg.Location)
    }
}

搜索包

go
func searchPackages() {
    manager := pip.NewManager(nil)
    
    queries := []string{
        "web framework",
        "machine learning",
        "data analysis",
        "testing",
    }
    
    for _, query := range queries {
        fmt.Printf("\n搜索: %s\n", query)
        fmt.Println(strings.Repeat("=", 50))
        
        results, err := manager.SearchPackages(query)
        if err != nil {
            fmt.Printf("搜索失败: %v\n", err)
            continue
        }
        
        // 显示前5个结果
        for i, result := range results {
            if i >= 5 {
                break
            }
            fmt.Printf("%d. %s (%s)\n", i+1, result.Name, result.Version)
            fmt.Printf("   %s\n", result.Summary)
        }
        
        if len(results) > 5 {
            fmt.Printf("   ... 还有 %d 个结果\n", len(results)-5)
        }
    }
}

显示包详细信息

go
func showPackageDetails() {
    manager := pip.NewManager(nil)
    
    packageNames := []string{"requests", "django", "numpy"}
    
    for _, name := range packageNames {
        fmt.Printf("\n%s 包信息:\n", name)
        fmt.Println(strings.Repeat("=", 50))
        
        info, err := manager.ShowPackage(name)
        if err != nil {
            fmt.Printf("获取 %s 信息失败: %v\n", name, err)
            continue
        }
        
        fmt.Printf("名称: %s\n", info.Name)
        fmt.Printf("版本: %s\n", info.Version)
        fmt.Printf("摘要: %s\n", info.Summary)
        fmt.Printf("作者: %s\n", info.Author)
        fmt.Printf("许可证: %s\n", info.License)
        fmt.Printf("主页: %s\n", info.Homepage)
        
        if len(info.Requires) > 0 {
            fmt.Printf("依赖: %s\n", strings.Join(info.Requires, ", "))
        }
        
        if len(info.RequiredBy) > 0 {
            fmt.Printf("被依赖: %s\n", strings.Join(info.RequiredBy, ", "))
        }
    }
}

包更新和维护

检查过时的包

go
func checkOutdatedPackages() {
    manager := pip.NewManager(nil)
    
    fmt.Println("检查过时的包...")
    outdated, err := manager.CheckOutdated()
    if err != nil {
        log.Fatalf("检查过时包失败: %v", err)
    }
    
    if len(outdated) == 0 {
        fmt.Println("✅ 所有包都是最新的")
        return
    }
    
    fmt.Printf("发现 %d 个过时的包:\n", len(outdated))
    fmt.Println("包名\t\t当前版本\t最新版本")
    fmt.Println("----\t\t--------\t--------")
    
    for _, pkg := range outdated {
        fmt.Printf("%-20s %-15s %s\n", pkg.Name, pkg.CurrentVersion, pkg.LatestVersion)
    }
}

更新包

go
func updatePackages() {
    manager := pip.NewManager(nil)
    
    // 更新单个包
    fmt.Println("更新 requests 包...")
    if err := manager.UpdatePackage("requests"); err != nil {
        fmt.Printf("更新 requests 失败: %v\n", err)
    } else {
        fmt.Println("✅ requests 更新成功")
    }
    
    // 更新所有包
    fmt.Println("更新所有包...")
    if err := manager.UpdateAllPackages(); err != nil {
        fmt.Printf("更新所有包失败: %v\n", err)
    } else {
        fmt.Println("✅ 所有包更新成功")
    }
}

卸载包

go
func uninstallPackages() {
    manager := pip.NewManager(nil)
    
    packagesToRemove := []string{
        "old-package",
        "unused-dependency",
        "test-package",
    }
    
    for _, name := range packagesToRemove {
        fmt.Printf("卸载 %s...\n", name)
        if err := manager.UninstallPackage(name); err != nil {
            fmt.Printf("❌ 卸载 %s 失败: %v\n", name, err)
        } else {
            fmt.Printf("✅ %s 卸载成功\n", name)
        }
    }
}

需求文件管理

从需求文件安装

go
func installFromRequirements() {
    manager := pip.NewManager(nil)
    
    requirementFiles := []string{
        "requirements.txt",
        "dev-requirements.txt",
        "test-requirements.txt",
    }
    
    for _, file := range requirementFiles {
        if _, err := os.Stat(file); os.IsNotExist(err) {
            fmt.Printf("⚠️  文件 %s 不存在,跳过\n", file)
            continue
        }
        
        fmt.Printf("从 %s 安装依赖...\n", file)
        if err := manager.InstallRequirements(file); err != nil {
            fmt.Printf("❌ 从 %s 安装失败: %v\n", file, err)
        } else {
            fmt.Printf("✅ 从 %s 安装成功\n", file)
        }
    }
}

生成需求文件

go
func generateRequirements() {
    manager := pip.NewManager(nil)
    
    // 生成基本需求文件
    fmt.Println("生成 requirements.txt...")
    if err := manager.GenerateRequirements("requirements.txt"); err != nil {
        log.Fatalf("生成需求文件失败: %v", err)
    }
    
    // 生成冻结的需求文件(精确版本)
    fmt.Println("生成 requirements-freeze.txt...")
    packages, err := manager.FreezePackages()
    if err != nil {
        log.Fatalf("冻结包列表失败: %v", err)
    }
    
    file, err := os.Create("requirements-freeze.txt")
    if err != nil {
        log.Fatalf("创建文件失败: %v", err)
    }
    defer file.Close()
    
    for _, pkg := range packages {
        fmt.Fprintf(file, "%s==%s\n", pkg.Name, pkg.Version)
    }
    
    fmt.Println("✅ 需求文件生成完成")
}

依赖分析

分析包依赖

go
func analyzeDependencies() {
    manager := pip.NewManager(nil)
    
    packageName := "django"
    fmt.Printf("分析 %s 的依赖关系...\n", packageName)
    
    deps, err := manager.GetDependencies(packageName)
    if err != nil {
        log.Fatalf("获取依赖失败: %v", err)
    }
    
    printDependencyTree(deps, 0)
}

func printDependencyTree(tree *pip.DependencyTree, indent int) {
    prefix := strings.Repeat("  ", indent)
    fmt.Printf("%s- %s %s\n", prefix, tree.Package.Name, tree.Package.Version)
    
    for _, dep := range tree.Dependencies {
        printDependencyTree(dep, indent+1)
    }
}

检查依赖冲突

go
func checkDependencyConflicts() {
    manager := pip.NewManager(nil)
    
    fmt.Println("检查依赖冲突...")
    conflicts, err := manager.CheckDependencies()
    if err != nil {
        log.Fatalf("检查依赖失败: %v", err)
    }
    
    if len(conflicts) == 0 {
        fmt.Println("✅ 没有发现依赖冲突")
        return
    }
    
    fmt.Printf("发现 %d 个依赖冲突:\n", len(conflicts))
    for _, conflict := range conflicts {
        fmt.Printf("- %s: %s\n", conflict.Package, conflict.Description)
        fmt.Printf("  需要: %v\n", conflict.Required)
        fmt.Printf("  已安装: %s\n", conflict.Installed)
        fmt.Printf("  冲突: %v\n", conflict.Conflicts)
    }
}

错误处理和重试

健壮的包安装

go
func robustPackageInstall() {
    manager := pip.NewManager(nil)
    
    packages := []*pip.PackageSpec{
        {Name: "requests"},
        {Name: "nonexistent-package-12345"}, // 故意的错误包
        {Name: "click"},
    }
    
    for _, pkg := range packages {
        if err := installWithRetry(manager, pkg, 3); err != nil {
            fmt.Printf("❌ %s 最终安装失败: %v\n", pkg.Name, err)
        } else {
            fmt.Printf("✅ %s 安装成功\n", pkg.Name)
        }
    }
}

func installWithRetry(manager *pip.Manager, pkg *pip.PackageSpec, maxRetries int) error {
    var lastErr error
    
    for i := 0; i < maxRetries; i++ {
        err := manager.InstallPackage(pkg)
        if err == nil {
            return nil // 成功
        }
        
        lastErr = err
        
        // 检查是否为可重试的错误
        if pip.IsErrorType(err, pip.ErrorTypeNetworkError) ||
           pip.IsErrorType(err, pip.ErrorTypeNetworkTimeout) {
            
            fmt.Printf("网络错误,重试 %d/%d...\n", i+1, maxRetries)
            time.Sleep(time.Duration(i+1) * time.Second) // 指数退避
            continue
        }
        
        // 非网络错误,不重试
        return err
    }
    
    return fmt.Errorf("重试 %d 次后仍然失败: %w", maxRetries, lastErr)
}

完整示例:包管理工具

go
package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "strings"
    "time"
    
    "github.com/scagogogo/go-pip-sdk/pkg/pip"
)

func main() {
    manager := pip.NewManager(&pip.Config{
        LogLevel: "INFO",
        Timeout:  60 * time.Second,
    })
    
    for {
        fmt.Println("\n=== Python 包管理工具 ===")
        fmt.Println("1. 安装包")
        fmt.Println("2. 卸载包")
        fmt.Println("3. 列出已安装的包")
        fmt.Println("4. 搜索包")
        fmt.Println("5. 显示包信息")
        fmt.Println("6. 检查过时的包")
        fmt.Println("7. 更新包")
        fmt.Println("8. 生成需求文件")
        fmt.Println("9. 退出")
        fmt.Print("请选择操作 (1-9): ")
        
        reader := bufio.NewReader(os.Stdin)
        choice, _ := reader.ReadString('\n')
        choice = strings.TrimSpace(choice)
        
        switch choice {
        case "1":
            installPackageInteractive(manager, reader)
        case "2":
            uninstallPackageInteractive(manager, reader)
        case "3":
            listPackagesInteractive(manager)
        case "4":
            searchPackagesInteractive(manager, reader)
        case "5":
            showPackageInfoInteractive(manager, reader)
        case "6":
            checkOutdatedInteractive(manager)
        case "7":
            updatePackageInteractive(manager, reader)
        case "8":
            generateRequirementsInteractive(manager)
        case "9":
            fmt.Println("再见!")
            return
        default:
            fmt.Println("无效选择,请重试")
        }
    }
}

func installPackageInteractive(manager *pip.Manager, reader *bufio.Reader) {
    fmt.Print("请输入包名: ")
    packageName, _ := reader.ReadString('\n')
    packageName = strings.TrimSpace(packageName)
    
    fmt.Print("请输入版本约束 (可选,直接回车跳过): ")
    version, _ := reader.ReadString('\n')
    version = strings.TrimSpace(version)
    
    pkg := &pip.PackageSpec{
        Name:    packageName,
        Version: version,
    }
    
    fmt.Printf("正在安装 %s...\n", packageName)
    if err := manager.InstallPackage(pkg); err != nil {
        fmt.Printf("❌ 安装失败: %v\n", err)
    } else {
        fmt.Printf("✅ %s 安装成功\n", packageName)
    }
}

// ... 其他交互函数的实现

下一步

基于 MIT 许可证发布。