#!/usr/bin/env python3
“”"
Pi-RAP: Pi-based Reconstruction & Archival Protocol
Complete File System with Error Correction, Encryption, Compression
Inspired by “No Way Out” (1987) reconstruction scene
Features:
- Pi-based sector distribution
- Reed-Solomon error correction
- Pi stream cipher encryption
- Zlib compression
- Network distribution simulation
- Checksum verification
- Progress tracking
“”"
import os
import sys
import zlib
import hashlib
import random
import struct
import json
import time
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import math
Try to import reedsolo for advanced error correction
try:
import reedsolo
REED_SOLOMON_AVAILABLE = True
except ImportError:
REED_SOLOMON_AVAILABLE = False
print(“Note: Install ‘reedsolo’ for advanced error correction: pip install reedsolo”)
class PiDigitGenerator:
“”“Generate Pi digits using spigot algorithm”“”
def __init__(self):
self.digits = self._calculate_pi_digits(10000)
def _calculate_pi_digits(self, n_digits: int) -> str:
"""Calculate Pi digits using spigot algorithm"""
if n_digits < 100:
# Use pre-calculated for small requests
pi_str = "14159265358979323846264338327950288419716939937510"
pi_str += "58209749445923078164062862089986280348253421170679"
return pi_str[:n_digits]
# Spigot algorithm for more digits
pi_digits = []
arr = [2] * (n_digits * 10 // 3)
carry = 0
for i in range(n_digits):
sum_val = 0
for j in range(len(arr) - 1, -1, -1):
num = arr[j] * 10 + carry
if j == 0:
pi_digits.append(str(num // (2 * j + 1)))
carry = num % (2 * j + 1)
else:
arr[j] = num % (2 * j + 1)
sum_val = num // (2 * j + 1)
carry = sum_val * (j)
if len(pi_digits) >= n_digits:
break
result = ''.join(pi_digits[:n_digits])
return result if result else "14159265358979323846264338327950288419716939937510"
def get_digit(self, position: int) -> int:
"""Get Pi digit at specific position (0-indexed)"""
if position < len(self.digits):
return int(self.digits[position])
return int(self.digits[position % len(self.digits)])
def get_digits(self, start: int, length: int) -> str:
"""Get a sequence of Pi digits"""
result = ""
for i in range(length):
result += str(self.get_digit(start + i))
return result
class PiStreamCipher:
“”“Encryption using Pi digits as stream cipher key”“”
def __init__(self, pi_gen: PiDigitGenerator, seed: int = 0):
self.pi_gen = pi_gen
self.seed = seed
def _generate_keystream(self, length: int) -> bytes:
"""Generate keystream from Pi digits"""
keystream = bytearray()
position = self.seed
# Generate enough bytes
while len(keystream) < length:
# Get multiple Pi digits and combine them
digits = self.pi_gen.get_digits(position, 4)
byte_val = int(digits) % 256
keystream.append(byte_val)
position += 4
return bytes(keystream[:length])
def encrypt(self, data: bytes) -> bytes:
"""Encrypt data using XOR with Pi keystream"""
keystream = self._generate_keystream(len(data))
encrypted = bytes(a ^ b for a, b in zip(data, keystream))
return encrypted
def decrypt(self, data: bytes) -> bytes:
"""Decrypt data (same as encrypt for XOR cipher)"""
return self.encrypt(data)
class ReedSolomonCodec:
“”“Error correction using Reed-Solomon codes”“”
def __init__(self, nsym: int = 10):
"""
nsym: number of error correction symbols
More symbols = more error correction but larger overhead
"""
self.nsym = nsym
self.rs = None
if REED_SOLOMON_AVAILABLE:
try:
self.rs = reedsolo.RSCodec(nsym)
except:
self.rs = None
def encode(self, data: bytes) -> bytes:
"""Add error correction codes"""
if self.rs:
return self.rs.encode(data)
else:
# Simple parity fallback
parity = bytes([sum(data) % 256])
return data + parity * self.nsym
def decode(self, data: bytes) -> bytes:
"""Remove error correction and correct errors"""
if self.rs:
try:
return self.rs.decode(data)[0]
except:
return data[:len(data) - self.nsym]
else:
# Simple parity fallback
return data[:-self.nsym] if self.nsym > 0 else data
class NetworkDistributor:
“”“Simulate network distribution across multiple nodes”“”
def __init__(self, num_nodes: int = 5):
self.num_nodes = num_nodes
self.nodes = {f"node_{i}": {} for i in range(num_nodes)}
def distribute(self, sectors: Dict[str, dict]) -> Dict[str, Dict]:
"""Distribute sectors across network nodes"""
distribution = {}
for sector_id, sector_data in sectors.items():
# Use Pi digit to determine primary and backup nodes
sector_num = int(sector_id.split('_')[-1].replace('.dat', ''))
pi_digit = sector_num % 10
primary_node = f"node_{pi_digit % self.num_nodes}"
backup_node = f"node_{(pi_digit + 1) % self.num_nodes}"
# Store on primary node
self.nodes[primary_node][sector_id] = {
'data': sector_data['data'],
'type': 'primary'
}
# Store backup on secondary node
self.nodes[backup_node][sector_id] = {
'data': sector_data['data'],
'type': 'backup'
}
distribution[sector_id] = {
'primary': primary_node,
'backup': backup_node
}
return distribution
def simulate_node_failure(self, failed_node: str) -> int:
"""Simulate a node going offline"""
if failed_node in self.nodes:
lost_sectors = len(self.nodes[failed_node])
self.nodes[failed_node] = {}
return lost_sectors
return 0
def recover_from_backup(self, lost_sectors: List[str]) -> Dict[str, dict]:
"""Recover lost sectors from backup nodes"""
recovered = {}
for sector_id in lost_sectors:
for node_name, node_data in self.nodes.items():
if sector_id in node_data:
if node_data[sector_id]['type'] == 'backup':
# Promote backup to primary
node_data[sector_id]['type'] = 'primary'
recovered[sector_id] = node_data[sector_id]['data']
break
return recovered
def get_all_sectors(self) -> Dict[str, dict]:
"""Get all sectors from all nodes"""
all_sectors = {}
for node_name, node_data in self.nodes.items():
for sector_id, sector_info in node_data.items():
if sector_info['type'] == 'primary':
all_sectors[sector_id] = {
'data': sector_info['data'],
'node': node_name
}
return all_sectors
class PiRAPFileSystem:
“”“Main Pi-RAP File System”“”
def __init__(self, config: dict = None):
self.config = config or {
'sector_size': 50000, # bits
'compression': True,
'encryption': True,
'error_correction': True,
'encryption_seed': 0,
'network_nodes': 5,
'redundancy_level': 2,
'verbose': True
}
# Initialize components
self.pi_gen = PiDigitGenerator()
self.cipher = PiStreamCipher(self.pi_gen, self.config.get('encryption_seed', 0))
self.rs_codec = ReedSolomonCodec(nsym=10 if self.config.get('error_correction') else 0)
self.network = NetworkDistributor(self.config.get('network_nodes', 5))
self.byte_sector_size = self.config.get('sector_size', 50000) // 8
self.storage_map = {}
self.file_metadata = {}
self.distribution_map = {}
# Statistics
self.stats = {
'original_size': 0,
'compressed_size': 0,
'encrypted_size': 0,
'total_sectors': 0,
'compression_ratio': 0,
'processing_time': 0
}
def log(self, message: str, level: str = "INFO"):
"""Log message with timestamp"""
if self.config.get('verbose', True):
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def progress_bar(self, current: int, total: int, prefix: str = "", length: int = 40):
"""Display progress bar"""
if not self.config.get('verbose', True):
return
percent = 100 * (current / total) if total > 0 else 0
filled = int(length * current // total)
bar = '█' * filled + '░' * (length - filled)
sys.stdout.write(f'\r{prefix} |{bar}| {percent:.1f}% ({current}/{total})')
sys.stdout.flush()
if current >= total:
print()
def calculate_checksum(self, data: bytes) -> str:
"""Calculate SHA-256 checksum"""
return hashlib.sha256(data).hexdigest()
def compress_data(self, data: bytes) -> bytes:
"""Compress data using zlib"""
if not self.config.get('compression', True):
return data
compressed = zlib.compress(data, level=9)
compression_ratio = len(compressed) / len(data) if len(data) > 0 else 1
self.stats['compression_ratio'] = compression_ratio
return compressed
def decompress_data(self, data: bytes) -> bytes:
"""Decompress data"""
if not self.config.get('compression', True):
return data
try:
return zlib.decompress(data)
except:
return data
def read_file(self, filepath: str) -> Tuple[bytes, str]:
"""Read file and calculate checksum"""
self.log(f"Reading file: {filepath}")
with open(filepath, 'rb') as f:
data = f.read()
checksum = self.calculate_checksum(data)
file_size = len(data)
self.stats['original_size'] = file_size
self.log(f"File size: {file_size:,} bytes ({file_size*8:,} bits)")
self.log(f"SHA-256: {checksum[:32]}...")
return data, checksum
def create_sectors(self, data: bytes, filename: str) -> List[bytes]:
"""Break data into sectors"""
self.log(f"Creating sectors ({self.byte_sector_size:,} bytes each)...")
# Pad data if needed
original_len = len(data)
if len(data) % self.byte_sector_size != 0:
padding_len = self.byte_sector_size - (len(data) % self.byte_sector_size)
data = data + b'\x00' * padding_len
# Split into sectors
sectors = []
total_sectors = len(data) // self.byte_sector_size
for i in range(total_sectors):
start = i * self.byte_sector_size
end = start + self.byte_sector_size
sector_data = data[start:end]
sectors.append(sector_data)
if self.config.get('verbose', True):
self.progress_bar(i + 1, total_sectors, "Sectorizing")
self.stats['total_sectors'] = len(sectors)
# Store metadata
self.file_metadata = {
'filename': filename,
'original_size': original_len,
'padded_size': len(data),
'num_sectors': len(sectors),
'sector_size': self.byte_sector_size,
'timestamp': datetime.now().isoformat(),
'config': self.config
}
self.log(f"✓ Created {len(sectors)} sectors")
return sectors
def encrypt_and_encode_sectors(self, sectors: List[bytes]) -> Dict[str, dict]:
"""Encrypt sectors and add error correction"""
self.log("Encrypting and adding error correction...")
encrypted_sectors = {}
total = len(sectors)
for idx, sector in enumerate(sectors):
# Encrypt
if self.config.get('encryption', True):
encrypted = self.cipher.encrypt(sector)
else:
encrypted = sector
# Add error correction
if self.config.get('error_correction', True):
encoded = self.rs_codec.encode(encrypted)
else:
encoded = encrypted
# Determine storage location using Pi
pi_digit = self.pi_gen.get_digit(idx)
disk_id = f"Disk_{pi_digit}"
sector_address = f"{disk_id}/sector_{idx:06d}.dat"
encrypted_sectors[sector_address] = {
'data': encoded,
'original_index': idx,
'pi_digit': pi_digit,
'disk_id': disk_id,
'size': len(encoded)
}
if self.config.get('verbose', True):
self.progress_bar(idx + 1, total, "Encrypting")
self.log(f"✓ Encoded {len(encrypted_sectors)} sectors")
return encrypted_sectors
def scatter_sectors(self, sectors: Dict[str, dict]):
"""Distribute sectors across network"""
self.log(f"Distributing across {self.config.get('network_nodes', 5)} nodes...")
# Network distribution
self.distribution_map = self.network.distribute(sectors)
# Also store locally
self.storage_map = sectors
self.log(f"✓ Distributed to {len(self.distribution_map)} locations")
# Show distribution stats
node_counts = {}
for sector_id, dist_info in self.distribution_map.items():
primary = dist_info['primary']
node_counts[primary] = node_counts.get(primary, 0) + 1
self.log("Distribution across nodes:")
for node, count in sorted(node_counts.items()):
self.log(f" {node}: {count} sectors")
def simulate_data_loss(self, loss_percentage: float = 10) -> Tuple[Dict, List]:
"""Simulate data loss for testing recovery"""
self.log(f"⚠️ Simulating {loss_percentage}% data loss...")
all_sectors = self.network.get_all_sectors()
total_sectors = len(all_sectors)
num_lost = int(total_sectors * loss_percentage / 100)
# Randomly select sectors to lose
lost_keys = random.sample(list(all_sectors.keys()), num_lost)
# Remove from network
for sector_id in lost_keys:
for node_name in self.network.nodes:
if sector_id in self.network.nodes[node_name]:
del self.network.nodes[node_name][sector_id]
remaining = self.network.get_all_sectors()
self.log(f" Lost: {len(lost_keys)} sectors")
self.log(f" Remaining: {len(remaining)} sectors")
return remaining, lost_keys
def recover_and_reconstruct(self, available_sectors: Dict = None) -> bytes:
"""Recover from backups and reconstruct file"""
self.log("🔨 Recovering and reconstructing...")
if available_sectors is None:
available_sectors = self.network.get_all_sectors()
# Check for missing sectors
total_expected = self.file_metadata.get('num_sectors', 0)
available_count = len(available_sectors)
missing_count = total_expected - available_count
if missing_count > 0:
self.log(f" Missing {missing_count} sectors, attempting recovery...")
# Try to recover from backups
all_sectors = {}
for node_name, node_data in self.network.nodes.items():
for sector_id, sector_info in node_data.items():
if sector_id not in all_sectors:
all_sectors[sector_id] = sector_info
available_sectors = all_sectors
# Sort sectors by index
sorted_sectors = sorted(
available_sectors.items(),
key=lambda x: x[1].get('original_index', 0)
)
# Reconstruct data
reconstructed_data = b''
for sector_id, sector_info in sorted_sectors:
sector_data = sector_info['data']
# Remove error correction
if self.config.get('error_correction', True):
decoded = self.rs_codec.decode(sector_data)
else:
decoded = sector_data
# Decrypt
if self.config.get('encryption', True):
decrypted = self.cipher.decrypt(decoded)
else:
decrypted = decoded
reconstructed_data += decrypted
# Remove padding
original_size = self.file_metadata.get('original_size', len(reconstructed_data))
reconstructed_data = reconstructed_data[:original_size]
self.log(f"✓ Reconstructed {len(reconstructed_data):,} bytes")
return reconstructed_data
def verify_integrity(self, data: bytes, original_checksum: str) -> bool:
"""Verify reconstructed data matches original"""
self.log("✅ Verifying integrity...")
reconstructed_checksum = self.calculate_checksum(data)
self.log(f" Original: {original_checksum}")
self.log(f" Reconstructed: {reconstructed_checksum}")
if reconstructed_checksum == original_checksum:
self.log("🎉 SUCCESS! Checksum MATCH!")
return True
else:
self.log("❌ FAILURE! Checksum mismatch!")
return False
def save_file(self, data: bytes, output_path: str):
"""Save reconstructed file"""
self.log(f"💾 Saving to: {output_path}")
with open(output_path, 'wb') as f:
f.write(data)
file_size = os.path.getsize(output_path)
self.log(f"✓ Saved {file_size:,} bytes")
def process_file(self, input_path: str, output_path: str = None,
simulate_loss: float = 0) -> bool:
"""Complete pipeline: Read → Process → Scatter → Recover → Verify"""
start_time = time.time()
if output_path is None:
output_path = "reconstructed_" + os.path.basename(input_path)
print("\n" + "="*70)
print("🥧 Pi-RAP FILE SYSTEM v2.0 - COMPLETE EDITION")
print("="*70)
print(f"Input: {input_path}")
print(f"Output: {output_path}")
print(f"Config: Compression={self.config.get('compression')}, " +
f"Encryption={self.config.get('encryption')}, " +
f"Error Correction={self.config.get('error_correction')}")
print("="*70)
try:
# Step 1: Read file
original_data, checksum = self.read_file(input_path)
# Step 2: Compress
if self.config.get('compression', True):
self.log("Compressing data...")
compressed_data = self.compress_data(original_data)
self.stats['compressed_size'] = len(compressed_data)
self.log(f"✓ Compressed: {len(original_data):,} → {len(compressed_data):,} bytes " +
f"({self.stats['compression_ratio']*100:.1f}%)")
else:
compressed_data = original_data
# Step 3: Create sectors
sectors = self.create_sectors(compressed_data, input_path)
# Step 4: Encrypt and encode
encrypted_sectors = self.encrypt_and_encode_sectors(sectors)
self.stats['encrypted_size'] = sum(s['size'] for s in encrypted_sectors.values())
# Step 5: Scatter across network
self.scatter_sectors(encrypted_sectors)
# Step 6: Simulate data loss (optional)
if simulate_loss > 0:
available_sectors, lost = self.simulate_data_loss(simulate_loss)
else:
available_sectors = self.network.get_all_sectors()
# Step 7: Recover and reconstruct
reconstructed_compressed = self.recover_and_reconstruct(available_sectors)
# Step 8: Decompress
if self.config.get('compression', True):
self.log("Decompressing...")
reconstructed_data = self.decompress_data(reconstructed_compressed)
else:
reconstructed_data = reconstructed_compressed
# Step 9: Verify
success = self.verify_integrity(reconstructed_data, checksum)
# Step 10: Save
if success:
self.save_file(reconstructed_data, output_path)
# Calculate statistics
self.stats['processing_time'] = time.time() - start_time
# Print summary
print("\n" + "="*70)
print("📊 PROCESSING SUMMARY")
print("="*70)
print(f"Original size: {self.stats['original_size']:,} bytes")
print(f"Compressed size: {self.stats['compressed_size']:,} bytes")
print(f"Total sectors: {self.stats['total_sectors']:,}")
print(f"Compression ratio: {self.stats['compression_ratio']*100:.2f}%")
print(f"Processing time: {self.stats['processing_time']:.3f} seconds")
if self.stats['processing_time'] > 0:
throughput = self.stats['original_size'] / self.stats['processing_time'] / 1024 / 1024
print(f"Throughput: {throughput:.2f} MB/s")
print("="*70)
if success:
print("✅ PROCESSING COMPLETE - SUCCESS!")
else:
print("❌ PROCESSING COMPLETE - FAILED!")
print("="*70 + "\n")
return success
except Exception as e:
self.log(f"ERROR: {str(e)}", "ERROR")
import traceback
traceback.print_exc()
return False
def create_test_file(filepath: str, size_kb: int = 100):
“”“Create a test file with random data”“”
print(f"Creating test file: {filepath} ({size_kb} KB)")
# Create JPEG-like header
data = bytearray()
data.extend(b'\xFF\xD8\xFF\xE0\x00\x10JFIF') # JPEG header
data.extend(b'\x00' * (size_kb * 1024 - 20)) # Fill with zeros
data.extend(b'\xFF\xD9') # JPEG footer
with open(filepath, 'wb') as f:
f.write(data)
print(f"✓ Test file created")
def main():
“”“Main entry point with CLI”“”
import argparse
parser = argparse.ArgumentParser(
description='Pi-RAP: Pi-based Reconstruction & Archival Protocol',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python pi_rap_complete.py -i myfile.jpg
python pi_rap_complete.py -i myfile.jpg --loss 15
python pi_rap_complete.py -i myfile.jpg --no-compression --no-encryption
python pi_rap_complete.py --test
“”"
)
parser.add_argument('-i', '--input', help='Input file path')
parser.add_argument('-o', '--output', help='Output file path')
parser.add_argument('--loss', type=float, default=0,
help='Simulate data loss percentage (0-100)')
parser.add_argument('--sector-size', type=int, default=50000,
help='Sector size in bits (default: 50000)')
parser.add_argument('--nodes', type=int, default=5,
help='Number of network nodes (default: 5)')
parser.add_argument('--seed', type=int, default=0,
help='Encryption seed (default: 0)')
parser.add_argument('--no-compression', action='store_true',
help='Disable compression')
parser.add_argument('--no-encryption', action='store_true',
help='Disable encryption')
parser.add_argument('--no-error-correction', action='store_true',
help='Disable error correction')
parser.add_argument('--test', action='store_true',
help='Run with test file')
parser.add_argument('--quiet', action='store_true',
help='Suppress verbose output')
args = parser.parse_args()
# Create configuration
config = {
'sector_size': args.sector_size,
'compression': not args.no_compression,
'encryption': not args.no_encryption,
'error_correction': not args.no_error_correction,
'network_nodes': args.nodes,
'encryption_seed': args.seed,
'verbose': not args.quiet
}
# Initialize system
pirap = PiRAPFileSystem(config)
# Test mode
if args.test:
test_file = "test_image.jpg"
create_test_file(test_file, size_kb=500)
args.input = test_file
if not args.output:
args.output = "recovered_test.jpg"
# Process file
if args.input:
success = pirap.process_file(
input_path=args.input,
output_path=args.output,
simulate_loss=args.loss
)
sys.exit(0 if success else 1)
else:
parser.print_help()
sys.exit(1)
if name == “main”:
main()