仓库:https://huggingface.co/google/siglip-so400m-patch14-384
下载仓库:
展开代码./hfd.sh google/siglip-so400m-patch14-384 --local-dir ./google/siglip-so400m-patch14-384
懒得装环境,直接docker:
展开代码docker run -it \ --gpus '"device=0"' \ -v ./google/siglip-so400m-patch14-384:/google/siglip-so400m-patch14-384 \ -p 8033:8033 \ --shm-size 32g \ kevinchina/deeplearning:llamafactory20250311-3 bash
图片直接推理得到特征:
python展开代码from PIL import Image
import requests
import torch
from transformers import AutoProcessor, AutoModel
# 1. 指定 GPU 设备(如 GPU 0)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# 2. 加载模型并分配到 GPU
model = AutoModel.from_pretrained("/google/siglip-so400m-patch14-384").to(device)
processor = AutoProcessor.from_pretrained("/google/siglip-so400m-patch14-384")
# 3. 加载图像并预处理
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
image = Image.open(requests.get(url, stream=True).raw)
inputs = processor(images=image, return_tensors="pt", padding=True).to(device)  # 输入数据也移到 GPU
# 4. 推理(确保在 GPU 上运行)
with torch.no_grad():
    outputs = model.get_image_features(**inputs)
# 5. 输出特征向量(自动在 GPU 上,如需转 CPU 用 .cpu())
print("Feature vector shape:", outputs.shape)
print("Sample features (first 10 dims):", outputs[0, :10])
得到结果:
展开代码Using device: cuda:0 Feature vector shape: torch.Size([1, 1152]) Sample features (first 10 dims): tensor([ 0.0044, -0.3041, -0.2630, 0.3240, -0.3151, -0.7515, 0.3243, 0.5557, -0.0269, -0.0519], device='cuda:0')
python展开代码import torch
from transformers import AutoProcessor, AutoModel
from PIL import Image
import io
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI()
# Global variables for model and processor
model = None
processor = None
device = None
@app.on_event("startup")
async def startup_event():
    global model, processor, device
    # 1. Specify GPU device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    # 2. Load model and processor
    # Make sure the model path is correct as per your Docker volume mapping
    model_path = "/google/siglip-so400m-patch14-384"
    try:
        model = AutoModel.from_pretrained(model_path).to(device)
        processor = AutoProcessor.from_pretrained(model_path)
        print(f"Model and processor loaded successfully from {model_path}")
    except Exception as e:
        print(f"Error loading model/processor: {e}")
        # Optionally, re-raise or handle as critical failure
        raise
@app.post("/extract-features/")
async def extract_features(file: UploadFile = File(...)):
    if not model or not processor:
        return JSONResponse(status_code=503, content={"error": "Model not loaded. Please check server logs."})
    try:
        # 3. Load image from uploaded file
        contents = await file.read()
        image = Image.open(io.BytesIO(contents))
        # Preprocess image
        inputs = processor(images=image, return_tensors="pt", padding=True).to(device)
        # 4. Inference
        with torch.no_grad():
            outputs = model.get_image_features(**inputs)
        # 5. Prepare response
        # Convert tensor to list for JSON serialization
        features_list = outputs.cpu().tolist()
        
        return {"filename": file.filename, "features": features_list}
    except Exception as e:
        return JSONResponse(status_code=500, content={"error": str(e)})
if __name__ == "__main__":
    # This part is for running with uvicorn programmatically,
    # but you'll likely run it from the command line as:
    # uvicorn main:app --host 0.0.0.0 --port 8033
    uvicorn.run(app, host="0.0.0.0", port=8033)
python展开代码import requests
# API endpoint URL
url = "http://10.136.19.27:8033/extract-features/"
# Path to the image file you want to send
# Make sure '077.jpg' is in the same directory as this script, or provide the full path.
image_path = "077.jpg"
# Prepare the files dictionary for the POST request
# The key ('file') must match the parameter name in the FastAPI endpoint
try:
    with open(image_path, "rb") as img_file:
        files = {"file": (image_path, img_file, "image/jpeg")}
        # Send the POST request
        print(f"Sending request for image: {image_path}")
        response = requests.post(url, files=files)
        # Check the response
        if response.status_code == 200:
            print("Successfully received features:")
            result = response.json()
            print(f"Filename: {result.get('filename')}")
            # Print only a subset of features for brevity
            if result.get('features') and isinstance(result['features'], list) and len(result['features']) > 0:
                print(f"Feature vector shape: (1, {len(result['features'][0])})") # Assuming batch size of 1
                print(f"Sample features (first 10 dims): {result['features'][0][:10]}")
            else:
                print(f"Features data: {result.get('features')}")
        else:
            print(f"Error: {response.status_code}")
            try:
                print(f"Response content: {response.json()}")
            except requests.exceptions.JSONDecodeError:
                print(f"Response content: {response.text}")
except FileNotFoundError:
    print(f"Error: Image file not found at {image_path}. Please ensure the file exists.")
except requests.exceptions.ConnectionError:
    print(f"Error: Could not connect to the server at {url}. Ensure the FastAPI server is running.")
except Exception as e:
    print(f"An unexpected error occurred: {e}") 
# 可以得到:
# (base)   三 5月 07 #   21:51:56 #   /mnt/workcode/amex-datasets-process # python client.py 
# Sending request for image: 077.jpg
# Successfully received features:
# Filename: 077.jpg
# Feature vector shape: (1, 1152)
# Sample features (first 10 dims): [0.10803146660327911, 0.017530322074890137, 0.1551334261894226, -0.0017879307270050049, -0.687953531742096, -0.3093825578689575, 0.8667486906051636, -0.04316258430480957, -0.4580515921115875, 0.49320095777511597]
这段代码实现了一个图像匹配与拼接系统,主要功能包括:1) 从三个不同目录(yuantu/原图、Effect_ox1/效果图1、Effect_pf/效果图2)加载图像;2) 通过SigLIP API提取图像特征向量并实现本地缓存优化;3) 使用余弦相似度计算图像间的距离矩阵;4) 采用全局优化算法为原图寻找最匹配的效果图对;5) 将匹配成功的原图与两个效果图水平拼接保存。系统通过特征缓存机制减少API调用,支持断点续传,并采用最优匹配策略确保每组三张图片在视觉特征上高度相似,最终生成高质量的组合图像。
python展开代码import os
import numpy as np
from PIL import Image
import io
from pathlib import Path
# from scipy.fftpack import dct # No longer needed for pHash
from collections import defaultdict
import requests # Added for API calls
from numpy.linalg import norm # Added for cosine similarity
import time # Added for retry backoff
import hashlib  # Added for creating cache keys
import pickle  # Added for feature serialization
from scipy.optimize import linear_sum_assignment  # Added for optimal matching
try:
    from tqdm import tqdm
    tqdm_available = True
except ImportError:
    tqdm_available = False
    print("提示: 安装 tqdm 库可以获得更好的进度条显示 (pip install tqdm)")
# Helper function to resize images before feature extraction
def resize_image_if_needed(img_path, max_dim=768):
    """Opens and resizes an image if it exceeds max_dim, maintaining aspect ratio."""
    img = Image.open(img_path)
    width, height = img.size
    if width > max_dim or height > max_dim:
        ratio = min(max_dim / width, max_dim / height)
        new_width = int(width * ratio)
        new_height = int(height * ratio)
        img = img.resize((new_width, new_height), Image.LANCZOS)
    return img
# New function to just open images without resizing for final collage
def open_original_image(img_path):
    """Opens an image without resizing, for full resolution collages."""
    try:
        return Image.open(img_path)
    except Exception as e:
        print(f"Error opening image {os.path.basename(img_path)}: {e}")
        return None
# 创建保存目录
save_dir = "save"
os.makedirs(save_dir, exist_ok=True)
# 获取所有图片路径
yuantu_dir = os.path.join("heji", "yuantu")
Effect_ox1_dir = os.path.join("heji", "Effect_ox1")
Effect_pf_dir = os.path.join("heji", "Effect_pf")
yuantu_images = [os.path.join(yuantu_dir, f) for f in os.listdir(yuantu_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
Effect_ox1_images = [os.path.join(Effect_ox1_dir, f) for f in os.listdir(Effect_ox1_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
Effect_pf_images = [os.path.join(Effect_pf_dir, f) for f in os.listdir(Effect_pf_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
# --- NEW: API-specific resizing ---
def resize_for_api(img_path, target_size=384):
    """Resizes an image to target_size x target_size specifically for the API request."""
    try:
        img = Image.open(img_path)
        img = img.convert('RGB')  # Ensure RGB mode for consistency
        img = img.resize((target_size, target_size), Image.LANCZOS)
        
        # Return as bytes in memory
        img_byte_arr = io.BytesIO()
        img.save(img_byte_arr, format='JPEG', quality=95)
        img_byte_arr.seek(0)
        return img_byte_arr
    except Exception as e:
        print(f"Error resizing image {os.path.basename(img_path)}: {e}")
        return None
# --- NEW: Feature caching ---
# Create cache directory
cache_dir = "feature_cache"
os.makedirs(cache_dir, exist_ok=True)
def get_cache_path(image_path):
    """Generate a unique cache file path for an image based on its path."""
    # Create a hash of the image path to use as the filename
    img_hash = hashlib.md5(image_path.encode('utf-8')).hexdigest()
    return os.path.join(cache_dir, f"{img_hash}.pkl")
def save_features_to_cache(image_path, features):
    """Save features to the cache."""
    if features is None:
        return False
    
    try:
        cache_path = get_cache_path(image_path)
        with open(cache_path, 'wb') as f:
            pickle.dump(features, f)
        return True
    except Exception as e:
        print(f"Warning: Could not cache features for {os.path.basename(image_path)}: {e}")
        return False
def load_features_from_cache(image_path):
    """Load features from the cache if available."""
    cache_path = get_cache_path(image_path)
    
    # Check if cache file exists and is newer than the image file
    if not os.path.exists(cache_path):
        return None
    
    # Optionally check if image was modified after cache was created
    # Skip this if you don't want to regenerate features when images change
    img_mtime = os.path.getmtime(image_path)
    cache_mtime = os.path.getmtime(cache_path)
    if img_mtime > cache_mtime:
        # Image was modified after the cache was created
        print(f"Image {os.path.basename(image_path)} modified after cache, will re-extract features.")
        return None
    
    try:
        with open(cache_path, 'rb') as f:
            features = pickle.load(f)
        return features
    except Exception as e:
        print(f"Warning: Could not load cached features for {os.path.basename(image_path)}: {e}")
        return None
# --- SigLIP API Configuration ---
SIGLIP_API_URL = "http://10.136.19.27:8033/extract-features/"
# --- Feature Extraction via SigLIP API with retry and caching ---
def compute_siglip_features(image_path, api_url=SIGLIP_API_URL, max_retries=3, use_cache=True):
    """Computes image features using the SigLIP FastAPI service with caching and retry mechanism."""
    # Check cache first if enabled
    if use_cache:
        cached_features = load_features_from_cache(image_path)
        if cached_features is not None:
            # print(f"Using cached features for {os.path.basename(image_path)}")
            return cached_features
    
    # If not in cache or cache disabled, compute features
    # Resize image to 384x384 before sending to API
    img_bytes = resize_for_api(image_path)
    if img_bytes is None:
        return None
    
    retry_count = 0
    backoff_time = 1  # Start with 1 second backoff, will increase exponentially
    
    while retry_count < max_retries:
        try:
            # Send the already resized image instead of opening the file
            files = {"file": (os.path.basename(image_path), img_bytes, "image/jpeg")}
            response = requests.post(api_url, files=files, timeout=30)
            response.raise_for_status()  # Raise an exception for HTTP errors
            result = response.json()
            
            if result.get('features') and isinstance(result['features'], list) and len(result['features']) > 0:
                # Assuming features are returned as a list containing one feature vector list
                feature_vector = np.array(result['features'][0], dtype=np.float32)
                if feature_vector.ndim == 1 and feature_vector.size > 0:
                    # Cache successful result
                    if use_cache:
                        save_features_to_cache(image_path, feature_vector)
                    return feature_vector
                else:
                    print(f"Warning: Invalid feature vector structure for {os.path.basename(image_path)}. Shape: {feature_vector.shape}")
                    return None
            else:
                print(f"Warning: No features returned or unexpected format for {os.path.basename(image_path)}. API response: {result}")
                return None
                
        except requests.exceptions.HTTPError as e:
            retry_count += 1
            if retry_count < max_retries:
                print(f"HTTP Error ({retry_count}/{max_retries}): {e}. Retrying in {backoff_time} seconds...")
                time.sleep(backoff_time)
                backoff_time *= 2  # Exponential backoff
            else:
                print(f"Failed after {max_retries} attempts. HTTP Error: {e}")
                return None
                
        except requests.exceptions.RequestException as e:
            retry_count += 1
            if retry_count < max_retries:
                print(f"Request error ({retry_count}/{max_retries}): {e}. Retrying in {backoff_time} seconds...")
                time.sleep(backoff_time)
                backoff_time *= 2
            else:
                print(f"Failed after {max_retries} attempts. Request error: {e}")
                return None
                
        except Exception as e:
            retry_count += 1
            if retry_count < max_retries:
                print(f"Unexpected error ({retry_count}/{max_retries}): {e}. Retrying in {backoff_time} seconds...")
                time.sleep(backoff_time)
                backoff_time *= 2
            else:
                print(f"Failed after {max_retries} attempts. Unexpected error: {e}")
                return None
# --- NEW: Distance Calculation for SigLIP Features ---
def calculate_siglip_distance(vec1, vec2):
    """Calculates cosine distance (1 - similarity) between two SigLIP feature vectors."""
    if vec1 is None or vec2 is None:
        return float('inf') # Max distance if one feature is missing
    
    vec1 = np.asarray(vec1).flatten()
    vec2 = np.asarray(vec2).flatten()
    if vec1.shape != vec2.shape or vec1.size == 0 or vec2.size == 0:
        # print(f"Warning: Feature vectors have different shapes or are empty. Cannot compute distance. {vec1.shape} vs {vec2.shape}")
        return float('inf') # Max distance for incompatible vectors
    norm_vec1 = norm(vec1)
    norm_vec2 = norm(vec2)
    if norm_vec1 == 0 or norm_vec2 == 0:
        # If one vector is zero, they are maximally dissimilar unless both are zero
        return 1.0 if not (norm_vec1 == 0 and norm_vec2 == 0) else 0.0 
    similarity = np.dot(vec1, vec2) / (norm_vec1 * norm_vec2)
    # Clamp similarity to [-1, 1] to handle potential floating point inaccuracies
    similarity = np.clip(similarity, -1.0, 1.0)
    distance = 1 - similarity # Cosine distance
    return distance
# --- 特征提取 ---
print("正在提取图像特征 (使用 SigLIP API,带缓存)...")
# 提取特征
yuantu_features = {}
Effect_ox1_features = {}
Effect_pf_features = {}
# 对所有图像计算特征
print("处理原图特征...")
yuantu_iter = tqdm(yuantu_images) if tqdm_available else yuantu_images
cached_count = 0
api_count = 0
for image_path in yuantu_iter:
    if not tqdm_available:
        print(f"处理 {len(yuantu_features)+1}/{len(yuantu_images)}: {os.path.basename(image_path)}", end="\r")
    
    # Check if already in cache
    cached_features = load_features_from_cache(image_path)
    if cached_features is not None:
        yuantu_features[image_path] = {'siglip': cached_features}
        cached_count += 1
    else:
        # Request from API
        features = compute_siglip_features(image_path, use_cache=True)
        if features is not None:
            yuantu_features[image_path] = {'siglip': features}
            api_count += 1
    
if not tqdm_available:
    print()
print(f"原图特征: {cached_count} 从缓存加载, {api_count} 从API请求")
print("处理Effect_ox1特征...")
Effect_ox1_iter = tqdm(Effect_ox1_images) if tqdm_available else Effect_ox1_images
cached_count = 0
api_count = 0
for image_path in Effect_ox1_iter:
    if not tqdm_available:
        print(f"处理 {len(Effect_ox1_features)+1}/{len(Effect_ox1_images)}: {os.path.basename(image_path)}", end="\r")
    
    # Check if already in cache
    cached_features = load_features_from_cache(image_path)
    if cached_features is not None:
        Effect_ox1_features[image_path] = {'siglip': cached_features}
        cached_count += 1
    else:
        # Request from API
        features = compute_siglip_features(image_path, use_cache=True)
        if features is not None:
            Effect_ox1_features[image_path] = {'siglip': features}
            api_count += 1
    
if not tqdm_available:
    print()
print(f"Effect_ox1特征: {cached_count} 从缓存加载, {api_count} 从API请求")
    
print("处理Effect_pf特征...")
Effect_pf_iter = tqdm(Effect_pf_images) if tqdm_available else Effect_pf_images
cached_count = 0
api_count = 0
for image_path in Effect_pf_iter:
    if not tqdm_available:
        print(f"处理 {len(Effect_pf_features)+1}/{len(Effect_pf_images)}: {os.path.basename(image_path)}", end="\r")
    
    # Check if already in cache
    cached_features = load_features_from_cache(image_path)
    if cached_features is not None:
        Effect_pf_features[image_path] = {'siglip': cached_features}
        cached_count += 1
    else:
        # Request from API
        features = compute_siglip_features(image_path, use_cache=True)
        if features is not None:
            Effect_pf_features[image_path] = {'siglip': features}
            api_count += 1
    
if not tqdm_available:
    print()
print(f"Effect_pf特征: {cached_count} 从缓存加载, {api_count} 从API请求")
print("特征提取完成!")
# --- NEW: 全局优化匹配算法 ---
print("开始全局优化匹配...")
# 将所有图片转换为索引列表,方便后续处理
valid_yuantu_images = [img_path for img_path in yuantu_images if img_path in yuantu_features]
valid_Effect_ox1_images = [img_path for img_path in Effect_ox1_images if img_path in Effect_ox1_features]
valid_Effect_pf_images = [img_path for img_path in Effect_pf_images if img_path in Effect_pf_features]
print(f"有效图片数量: 原图={len(valid_yuantu_images)}, Effect_ox1={len(valid_Effect_ox1_images)}, Effect_pf={len(valid_Effect_pf_images)}")
if len(valid_yuantu_images) == 0 or len(valid_Effect_ox1_images) == 0 or len(valid_Effect_pf_images) == 0:
    print("错误: 至少一个文件夹中没有有效的特征图片,无法进行匹配。")
    exit(1)
# 创建距离矩阵:yuantu-Effect_ox1和yuantu-Effect_pf
print("计算所有图片对之间的距离...")
# 计算yuantu与Effect_ox1之间的距离矩阵
yuantu_Effect_ox1_distances = np.zeros((len(valid_yuantu_images), len(valid_Effect_ox1_images)))
for i, yuantu_path in enumerate(valid_yuantu_images):
    yuantu_feat = yuantu_features[yuantu_path]['siglip']
    for j, Effect_ox1_path in enumerate(valid_Effect_ox1_images):
        Effect_ox1_feat = Effect_ox1_features[Effect_ox1_path]['siglip']
        yuantu_Effect_ox1_distances[i, j] = calculate_siglip_distance(yuantu_feat, Effect_ox1_feat)
# 计算yuantu与Effect_pf之间的距离矩阵
yuantu_Effect_pf_distances = np.zeros((len(valid_yuantu_images), len(valid_Effect_pf_images)))
for i, yuantu_path in enumerate(valid_yuantu_images):
    yuantu_feat = yuantu_features[yuantu_path]['siglip']
    for j, Effect_pf_path in enumerate(valid_Effect_pf_images):
        Effect_pf_feat = Effect_pf_features[Effect_pf_path]['siglip']
        yuantu_Effect_pf_distances[i, j] = calculate_siglip_distance(yuantu_feat, Effect_pf_feat)
print("距离计算完成,开始优化匹配...")
# 优化匹配逻辑
matched_triplets = []
remaining_yuantu = list(range(len(valid_yuantu_images)))
remaining_Effect_ox1 = list(range(len(valid_Effect_ox1_images)))
remaining_Effect_pf = list(range(len(valid_Effect_pf_images)))
# 因为我们需要三个文件夹的最优匹配,我们会迭代地移除已匹配的图片
iteration = 0
max_iterations = min(len(valid_yuantu_images), len(valid_Effect_ox1_images), len(valid_Effect_pf_images))
while (iteration < max_iterations and 
       len(remaining_yuantu) > 0 and 
       len(remaining_Effect_ox1) > 0 and 
       len(remaining_Effect_pf) > 0):
    
    iteration += 1
    print(f"匹配迭代 {iteration}/{max_iterations},剩余: 原图={len(remaining_yuantu)}, Effect_ox1={len(remaining_Effect_ox1)}, Effect_pf={len(remaining_Effect_pf)}")
    
    # 构建当前迭代的子距离矩阵
    curr_yuantu_Effect_ox1 = np.zeros((len(remaining_yuantu), len(remaining_Effect_ox1)))
    for i, yuantu_idx in enumerate(remaining_yuantu):
        for j, Effect_ox1_idx in enumerate(remaining_Effect_ox1):
            curr_yuantu_Effect_ox1[i, j] = yuantu_Effect_ox1_distances[yuantu_idx, Effect_ox1_idx]
    
    curr_yuantu_Effect_pf = np.zeros((len(remaining_yuantu), len(remaining_Effect_pf)))
    for i, yuantu_idx in enumerate(remaining_yuantu):
        for j, Effect_pf_idx in enumerate(remaining_Effect_pf):
            curr_yuantu_Effect_pf[i, j] = yuantu_Effect_pf_distances[yuantu_idx, Effect_pf_idx]
    
    # 合并距离矩阵,寻找最佳组合
    best_total_distance = float('inf')
    best_triplet = None
    
    for i in range(len(remaining_yuantu)):
        yuantu_idx = remaining_yuantu[i]
        
        # 找到与当前yuantu最相似的Effect_ox1
        Effect_ox1_distances = curr_yuantu_Effect_ox1[i, :]
        min_Effect_ox1_distance = np.min(Effect_ox1_distances)
        min_Effect_ox1_j = np.argmin(Effect_ox1_distances)
        Effect_ox1_idx = remaining_Effect_ox1[min_Effect_ox1_j]
        
        # 找到与当前yuantu最相似的Effect_pf
        Effect_pf_distances = curr_yuantu_Effect_pf[i, :]
        min_Effect_pf_distance = np.min(Effect_pf_distances)
        min_Effect_pf_j = np.argmin(Effect_pf_distances)
        Effect_pf_idx = remaining_Effect_pf[min_Effect_pf_j]
        
        # 计算总距离
        total_distance = min_Effect_ox1_distance + min_Effect_pf_distance
        
        # 更新最佳匹配
        if total_distance < best_total_distance:
            best_total_distance = total_distance
            best_triplet = (yuantu_idx, Effect_ox1_idx, Effect_pf_idx, min_Effect_ox1_j, min_Effect_pf_j)
    
    if best_triplet:
        yuantu_idx, Effect_ox1_idx, Effect_pf_idx, Effect_ox1_local_idx, Effect_pf_local_idx = best_triplet
        matched_triplets.append((valid_yuantu_images[yuantu_idx], 
                                valid_Effect_ox1_images[Effect_ox1_idx], 
                                valid_Effect_pf_images[Effect_pf_idx]))
        
        # 从可用索引中移除已匹配的图片
        remaining_yuantu.remove(yuantu_idx)
        remaining_Effect_ox1.remove(Effect_ox1_idx)
        remaining_Effect_pf.remove(Effect_pf_idx)
    else:
        # 如果无法找到最佳匹配,退出循环
        print("无法找到更多的最佳匹配,结束匹配过程。")
        break
print(f"成功匹配了 {len(matched_triplets)} 组三元组。")
# 剩下图片数量不平衡的情况下,可以选择一些启发式方法来处理
# 这里为简单起见,我们就只使用已匹配的三元组
# --- 拼接和保存图像 ---
print("开始拼接和保存图像...")
print("使用原始尺寸图像进行拼接,可能会生成较大文件...")
matched_iter = tqdm(enumerate(matched_triplets), total=len(matched_triplets)) if tqdm_available else enumerate(matched_triplets)
for i, (yuantu_path, Effect_ox1_path, Effect_pf_path) in matched_iter:
    if not tqdm_available:
        print(f"拼接图片 {i+1}/{len(matched_triplets)}", end="\r")
    
    try:
        # 打开原始尺寸图片,不进行预缩放
        yuantu_img = open_original_image(yuantu_path)
        Effect_ox1_img = open_original_image(Effect_ox1_path)
        Effect_pf_img = open_original_image(Effect_pf_path)
        
        if not yuantu_img or not Effect_ox1_img or not Effect_pf_img:
            print(f"无法打开三元组中的一个或多个图像,跳过: {os.path.basename(yuantu_path)}")
            continue
        
        # 获取原始尺寸
        yuantu_w, yuantu_h = yuantu_img.size
        Effect_ox1_w, Effect_ox1_h = Effect_ox1_img.size
        Effect_pf_w, Effect_pf_h = Effect_pf_img.size
        
        # 计算最大高度,使用原图的高度作为参考
        target_height = yuantu_h
        
        # 调整其他图片高度与原图一致,保持宽高比
        new_Effect_ox1_w = int(Effect_ox1_w * target_height / Effect_ox1_h)
        Effect_ox1_img = Effect_ox1_img.resize((new_Effect_ox1_w, target_height), Image.LANCZOS)
        
        new_Effect_pf_w = int(Effect_pf_w * target_height / Effect_pf_h)
        Effect_pf_img = Effect_pf_img.resize((new_Effect_pf_w, target_height), Image.LANCZOS)
        
        # 创建新图像
        total_width = yuantu_w + new_Effect_ox1_w + new_Effect_pf_w
        new_img = Image.new('RGB', (total_width, target_height))
        
        # 粘贴图像
        current_x = 0
        new_img.paste(yuantu_img, (current_x, 0))
        current_x += yuantu_w
        new_img.paste(Effect_ox1_img, (current_x, 0))
        current_x += new_Effect_ox1_w
        new_img.paste(Effect_pf_img, (current_x, 0))
        
        # 保存拼接图像
        save_path = os.path.join(save_dir, f"{i+1:03d}.jpg")
        new_img.save(save_path, quality=95)  # 使用高质量保存
        
    except Exception as e:
        print(f"拼接图片时出错 ({i+1}): {e}")
        continue
print("\n所有图片拼接完成!共生成 {len(matched_triplets)} 张拼接图像。") 


本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!