Hi,
I have Cosmos Predict 2B Video2World running on a ec2 gpu instance (Ampere architecture). However, doing inference i keep getting the issue:
Processing failed: The size of tensor a (24) must match the size of tensor b (93) at non-singleton dimension 2
I am using the same input image as in the documentation: nvidia/Cosmos-Predict2-2B-Video2World ยท Hugging Face
I have look at this from every angle but canโt find the problem. Any ideas?
Thanks in advance!
My code:
class CosmosVideoProcessor:
def init(self, input_bucket=None, output_bucket=None,
cache_dir=โ/data/hf_cacheโ, aws_region=โus-east-1โ):
self.input_bucket = input_bucket
self.output_bucket = output_bucket
self.cache_dir = cache_dir
self.aws_region = aws_region
self.pipe = None
self.s3_client = self._setup_s3()
self._setup_gpu_environment()
def _setup_s3(self):
try:
client = boto3.client('s3', region_name=self.aws_region)
client.list_buckets()
print("โ
S3 client initialized")
return client
except NoCredentialsError:
print("โ AWS credentials not found. Configure with 'aws configure'")
raise
except Exception as e:
print(f"โ S3 setup failed: {e}")
raise
def _setup_gpu_environment(self):
"""Optimized GPU setup for p4d.24xlarge (8x A100 GPUs)."""
print("๐ง Setting up GPU environment...")
# Display all available GPUs
for i in range(torch.cuda.device_count()):
gpu = torch.cuda.get_device_properties(i)
print(f" GPU {i}: {gpu.name} ({gpu.total_memory / 1e9:.1f} GB)")
# Optimized memory settings for A10G GPUs
os.environ.update({
'PYTORCH_CUDA_ALLOC_CONF': 'max_split_size_mb:256,expandable_segments:True',
'CUDA_LAUNCH_BLOCKING': '0',
'NCCL_P2P_DISABLE': '1',
})
# Enable optimizations
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.benchmark = True
self._clear_gpu_memory()
def _clear_gpu_memory(self):
"""Clear GPU memory across all devices."""
print("๐งน Clearing GPU memory...")
gc.collect()
for i in range(torch.cuda.device_count()):
with torch.cuda.device(i):
torch.cuda.empty_cache()
torch.cuda.synchronize()
def _check_gpu_memory(self):
"""Check memory usage across all GPUs."""
print("๐ GPU Memory Status:")
gpu_info = {}
for i in range(torch.cuda.device_count()):
allocated = torch.cuda.memory_allocated(i) / 1e9
reserved = torch.cuda.memory_reserved(i) / 1e9
total = torch.cuda.get_device_properties(i).total_memory / 1e9
free = total - reserved
gpu_info[i] = {
'allocated': allocated,
'reserved': reserved,
'total': total,
'free': free,
'usable': free > 8.0 # Need 8GB+ free for A10G
}
status = "๐ข READY" if gpu_info[i]['usable'] else "๐ด LOW MEM"
print(f" GPU {i}: {allocated:.1f}GB used, {free:.1f}GB free - {status}")
usable_gpus = [i for i in range(torch.cuda.device_count()) if gpu_info[i]['usable']]
print(f"๐ฏ Usable GPUs: {usable_gpus}")
return gpu_info, usable_gpus
def _find_cached_model(self):
"""Look for cached model to avoid re-downloading."""
model_dir = os.path.join(self.cache_dir, "models--nvidia--Cosmos-Predict2-2B-Video2World")
snapshots_dir = os.path.join(model_dir, "snapshots")
if not os.path.exists(snapshots_dir):
return None
snapshots = [d for d in os.listdir(snapshots_dir)
if os.path.isdir(os.path.join(snapshots_dir, d))]
if snapshots:
path = os.path.join(snapshots_dir, snapshots[0])
print(f"โ
Found cached model: {path}")
return path
return None
def load_model(self):
"""Load Cosmos pipeline with multi-GPU support."""
if self.pipe is not None:
print("โ
Model already loaded")
return
print("๐ง Loading Cosmos pipeline...")
self._clear_gpu_memory()
# Check GPU availability
gpu_info, usable_gpus = self._check_gpu_memory()
if not usable_gpus:
raise RuntimeError("โ No GPUs with sufficient memory available!")
model_path = self._find_cached_model() or "nvidia/Cosmos-Predict2-2B-Video2World"
# Create offload directory
offload_dir = "/data/offload"
os.makedirs(offload_dir, exist_ok=True)
# Try different loading strategies
strategies = [
{
"name": "Multi-GPU Balanced",
"device_map": "balanced",
"max_memory": {i: f"{int(gpu_info[i]['free'] * 0.8)}GB" for i in usable_gpus},
"offload": True
},
{
"name": "Single GPU with Offload",
"device_map": None,
"target_gpu": usable_gpus[0],
"offload": True
},
{
"name": "CPU Offload Only",
"device_map": None,
"target_gpu": usable_gpus[0],
"cpu_offload": True
}
]
for strategy in strategies:
try:
print(f"๐ Trying: {strategy['name']}")
self._clear_gpu_memory()
# Base pipeline arguments
pipe_kwargs = {
"torch_dtype": torch.bfloat16,
"low_cpu_mem_usage": True,
"cache_dir": self.cache_dir,
}
# Add strategy-specific arguments
if strategy["device_map"] == "balanced":
pipe_kwargs.update({
"device_map": "balanced",
"max_memory": strategy["max_memory"],
"offload_folder": offload_dir
})
# Load the pipeline
self.pipe = Cosmos2VideoToWorldPipeline.from_pretrained(
model_path, **pipe_kwargs
)
# Apply post-loading optimizations
if strategy.get("cpu_offload"):
print(" ๐ Enabling CPU offload...")
self.pipe.enable_model_cpu_offload(gpu_id=strategy["target_gpu"])
elif strategy.get("target_gpu") is not None and strategy["device_map"] is None:
device = f"cuda:{strategy['target_gpu']}"
print(f" ๐ Moving to {device}...")
self.pipe = self.pipe.to(device)
# Enable memory optimizations
print(" โก Enabling optimizations...")
self.pipe.enable_attention_slicing("max")
if hasattr(self.pipe, 'enable_vae_slicing'):
self.pipe.enable_vae_slicing()
if hasattr(self.pipe, 'enable_vae_tiling'):
self.pipe.enable_vae_tiling()
print(f"โ
Model loaded successfully with {strategy['name']}")
self._check_gpu_memory()
return
except Exception as e:
print(f"โ {strategy['name']} failed: {e}")
if hasattr(self, 'pipe') and self.pipe is not None:
del self.pipe
self.pipe = None
self._clear_gpu_memory()
continue
raise RuntimeError("โ Failed to load pipeline with any strategy")
def _generate_video(self, image, prompt, **kwargs):
"""Generate video with adaptive quality settings."""
# Default parameters optimized for A100 GPUs
params = {
'negative_prompt': "Static image, no motion, blurry, low quality, distorted",
'num_inference_steps': 8,
'guidance_scale': 7.5,
'fps': 16,
'seed': 42
}
params.update(kwargs)
print(f"๐จ Generating video...")
print(f" Prompt: {prompt}")
print(f" Steps: {params['num_inference_steps']}, Guidance: {params['guidance_scale']}")
generator = torch.Generator().manual_seed(params['seed'])
# Store original image for first attempt
original_image = image.copy()
print(f" ๐ Original size: {original_image.size}")
try:
# First try with original image size
print(f" ๐ Attempting with original image size...")
self._clear_gpu_memory()
with torch.amp.autocast('cuda', enabled=True, dtype=torch.bfloat16):
result = self.pipe(
image=original_image,
prompt=prompt,
negative_prompt=params['negative_prompt'],
num_inference_steps=params['num_inference_steps'],
guidance_scale=params['guidance_scale'],
fps=params['fps'],
generator=generator,
)
return result.frames[0]
except (torch.cuda.OutOfMemoryError, RuntimeError) as e:
print(f"โ ๏ธ Error with original size: {str(e)}")
print("๐ Trying with model-compatible dimensions...")
# Use specific aspect ratios and dimensions known to work with the model
w, h = original_image.size
aspect_ratio = w / h
if aspect_ratio > 1.5: # Wide image
target_size = (768, 432) # 16:9 aspect ratio, divisible by 24
elif aspect_ratio < 0.75: # Tall image
target_size = (432, 768) # 9:16 aspect ratio, divisible by 24
else: # Square-ish image
target_size = (576, 576) # Square, divisible by 24
resized_image = original_image.resize(target_size)
print(f" ๐ Resized to {target_size} for compatibility")
try:
self._clear_gpu_memory()
with torch.amp.autocast('cuda', enabled=True, dtype=torch.bfloat16):
result = self.pipe(
image=resized_image,
prompt=prompt,
negative_prompt=params['negative_prompt'],
num_inference_steps=params['num_inference_steps'],
guidance_scale=params['guidance_scale'],
fps=params['fps'],
generator=generator,
)
return result.frames[0]
except (torch.cuda.OutOfMemoryError, RuntimeError) as e2:
print(f"โ ๏ธ Still having issues: {str(e2)}")
print("๐ Final attempt with smallest compatible size...")
# Final fallback with smaller resolution guaranteed to work
# Using 336x336 (divisible by both 24 and 93 factors from error)
smallest_image = original_image.resize((372, 372)) # 372 = 4*93
print(" ๐ Resized to 372x372 for final attempt")
params['num_inference_steps'] = 5 # Reduce steps to save memory
self._clear_gpu_memory()
with torch.amp.autocast('cuda', enabled=True, dtype=torch.bfloat16):
result = self.pipe(
image=smallest_image,
prompt=prompt,
negative_prompt=params['negative_prompt'],
num_inference_steps=params['num_inference_steps'],
guidance_scale=params['guidance_scale'],
fps=params['fps'],
generator=generator,
)
return result.frames[0]
def _download_from_s3(self, s3_key, local_path, bucket=None):
"""Download file from S3 with progress."""
bucket = bucket or self.input_bucket
if not bucket:
raise ValueError("No input bucket specified")
print(f"๐ฅ Downloading s3://{bucket}/{s3_key}")
try:
self.s3_client.download_file(bucket, s3_key, local_path)
size_mb = os.path.getsize(local_path) / 1e6
print(f"โ
Downloaded: {size_mb:.1f} MB")
return local_path
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print(f"โ File not found: s3://{bucket}/{s3_key}")
else:
print(f"โ S3 error: {e}")
return None
def _upload_to_s3(self, local_path, s3_key, bucket=None):
"""Upload file to S3."""
bucket = bucket or self.output_bucket
if not bucket:
raise ValueError("No output bucket specified")
print(f"๐ค Uploading to s3://{bucket}/{s3_key}")
try:
self.s3_client.upload_file(local_path, bucket, s3_key)
print("โ
Upload successful")
return f"s3://{bucket}/{s3_key}"
except Exception as e:
print(f"โ Upload failed: {e}")
return None
def process_image_to_video(self, input_s3_key, prompt, output_s3_key=None, **kwargs):
"""Main processing function."""
if self.pipe is None:
self.load_model()
if output_s3_key is None:
base_name = os.path.splitext(input_s3_key)[0]
timestamp = int(time.time())
output_s3_key = f"{base_name}_video_{timestamp}.mp4"
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as input_temp, \
tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as output_temp:
input_path = input_temp.name
output_path = output_temp.name
try:
print(f"๐ผ๏ธ Processing: s3://{self.input_bucket}/{input_s3_key}")
# Download input image
if not self._download_from_s3(input_s3_key, input_path):
return {'error': 'Failed to download input image'}
# Load and process image
image = load_image(input_path)
print(f" ๐ Input size: {image.size}")
# Generate video
start_time = time.time()
video_frames = self._generate_video(image, prompt, **kwargs)
generation_time = time.time() - start_time
if video_frames is None:
return {'error': 'Video generation failed'}
print(f"โ
Generated {len(video_frames)} frames in {generation_time:.1f}s")
# Export video
fps = kwargs.get('fps', 16)
export_to_video(video_frames, output_path, fps=fps)
file_size_mb = os.path.getsize(output_path) / 1e6
# Upload result
s3_url = self._upload_to_s3(output_path, output_s3_key)
if not s3_url:
return {'error': 'Failed to upload video'}
return {
'input_s3': f"s3://{self.input_bucket}/{input_s3_key}",
'output_s3': s3_url,
'generation_time': generation_time,
'output_size_mb': file_size_mb,
'num_frames': len(video_frames),
'fps': fps,
'image_size': image.size
}
except Exception as e:
print(f"โ Processing failed: {e}")
return {'error': str(e)}
finally:
# Cleanup
for path in [input_path, output_path]:
if os.path.exists(path):
os.unlink(path)
self._clear_gpu_memory()
def main():
โโโMain execution function.โโโ
print(โ Cosmos Image-to-Video Processing (p4d.24xlarge)โ)
print(โ=โ * 50)
# Configuration
INPUT_BUCKET = "images-for-inference-testing"
OUTPUT_BUCKET = "images-for-inference-testing"
INPUT_IMAGE = "test.png"
PROMPT = "Doing the dishes fast"
# Generation parameters
params = {
'negative_prompt': "Static image, no motion, blurry, low quality",
'fps': 16,
'seed': 42,
'num_inference_steps': 8, # Optimized for A10G
'guidance_scale': 7.5
}
try:
processor = CosmosVideoProcessor(
input_bucket=INPUT_BUCKET,
output_bucket=OUTPUT_BUCKET
)
result = processor.process_image_to_video(INPUT_IMAGE, PROMPT, **params)
if 'error' not in result:
print(f"\n๐ SUCCESS!")
print(f" Input: {result['input_s3']}")
print(f" Output: {result['output_s3']}")
print(f" Generation time: {result['generation_time']:.1f}s")
print(f" Output size: {result['output_size_mb']:.1f} MB")
print(f" Frames: {result['num_frames']} @ {result['fps']} FPS")
else:
print(f"โ FAILED: {result['error']}")
except Exception as e:
print(f"โ ERROR: {e}")
import traceback
traceback.print_exc()
if name == โmainโ:
# Verify hardware requirements
if not torch.cuda.is_available():
print(โ CUDA not available!โ)
exit(1)
gpu_count = torch.cuda.device_count()
print(f"โ
Found {gpu_count} GPUs")
# Update for p4d.24xlarge which has 8 A100 GPUs
if gpu_count < 8:
print(f"โ ๏ธ Expected 8 GPUs for p4d.24xlarge, found {gpu_count}")
main()