Can any run see if this Pi-based Reconstruction and Archival Protocol Works?

#!/usr/bin/env python3
“”"
Pi-RAP: Pi-based Reconstruction & Archival Protocol
Complete File System with Error Correction, Encryption, Compression
Inspired by “No Way Out” (1987) reconstruction scene

Features:

  • Pi-based sector distribution
  • Reed-Solomon error correction
  • Pi stream cipher encryption
  • Zlib compression
  • Network distribution simulation
  • Checksum verification
  • Progress tracking
    “”"

import os
import sys
import zlib
import hashlib
import random
import struct
import json
import time
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import math

Try to import reedsolo for advanced error correction

try:
import reedsolo
REED_SOLOMON_AVAILABLE = True
except ImportError:
REED_SOLOMON_AVAILABLE = False
print(“Note: Install ‘reedsolo’ for advanced error correction: pip install reedsolo”)

class PiDigitGenerator:
“”“Generate Pi digits using spigot algorithm”“”

def __init__(self):
    self.digits = self._calculate_pi_digits(10000)

def _calculate_pi_digits(self, n_digits: int) -> str:
    """Calculate Pi digits using spigot algorithm"""
    if n_digits < 100:
        # Use pre-calculated for small requests
        pi_str = "14159265358979323846264338327950288419716939937510"
        pi_str += "58209749445923078164062862089986280348253421170679"
        return pi_str[:n_digits]
    
    # Spigot algorithm for more digits
    pi_digits = []
    arr = [2] * (n_digits * 10 // 3)
    carry = 0
    
    for i in range(n_digits):
        sum_val = 0
        for j in range(len(arr) - 1, -1, -1):
            num = arr[j] * 10 + carry
            if j == 0:
                pi_digits.append(str(num // (2 * j + 1)))
                carry = num % (2 * j + 1)
            else:
                arr[j] = num % (2 * j + 1)
                sum_val = num // (2 * j + 1)
                carry = sum_val * (j)
        
        if len(pi_digits) >= n_digits:
            break
    
    result = ''.join(pi_digits[:n_digits])
    return result if result else "14159265358979323846264338327950288419716939937510"

def get_digit(self, position: int) -> int:
    """Get Pi digit at specific position (0-indexed)"""
    if position < len(self.digits):
        return int(self.digits[position])
    return int(self.digits[position % len(self.digits)])

def get_digits(self, start: int, length: int) -> str:
    """Get a sequence of Pi digits"""
    result = ""
    for i in range(length):
        result += str(self.get_digit(start + i))
    return result

class PiStreamCipher:
“”“Encryption using Pi digits as stream cipher key”“”

def __init__(self, pi_gen: PiDigitGenerator, seed: int = 0):
    self.pi_gen = pi_gen
    self.seed = seed

def _generate_keystream(self, length: int) -> bytes:
    """Generate keystream from Pi digits"""
    keystream = bytearray()
    position = self.seed
    
    # Generate enough bytes
    while len(keystream) < length:
        # Get multiple Pi digits and combine them
        digits = self.pi_gen.get_digits(position, 4)
        byte_val = int(digits) % 256
        keystream.append(byte_val)
        position += 4
    
    return bytes(keystream[:length])

def encrypt(self, data: bytes) -> bytes:
    """Encrypt data using XOR with Pi keystream"""
    keystream = self._generate_keystream(len(data))
    encrypted = bytes(a ^ b for a, b in zip(data, keystream))
    return encrypted

def decrypt(self, data: bytes) -> bytes:
    """Decrypt data (same as encrypt for XOR cipher)"""
    return self.encrypt(data)

class ReedSolomonCodec:
“”“Error correction using Reed-Solomon codes”“”

def __init__(self, nsym: int = 10):
    """
    nsym: number of error correction symbols
    More symbols = more error correction but larger overhead
    """
    self.nsym = nsym
    self.rs = None
    
    if REED_SOLOMON_AVAILABLE:
        try:
            self.rs = reedsolo.RSCodec(nsym)
        except:
            self.rs = None

def encode(self, data: bytes) -> bytes:
    """Add error correction codes"""
    if self.rs:
        return self.rs.encode(data)
    else:
        # Simple parity fallback
        parity = bytes([sum(data) % 256])
        return data + parity * self.nsym

def decode(self, data: bytes) -> bytes:
    """Remove error correction and correct errors"""
    if self.rs:
        try:
            return self.rs.decode(data)[0]
        except:
            return data[:len(data) - self.nsym]
    else:
        # Simple parity fallback
        return data[:-self.nsym] if self.nsym > 0 else data

class NetworkDistributor:
“”“Simulate network distribution across multiple nodes”“”

def __init__(self, num_nodes: int = 5):
    self.num_nodes = num_nodes
    self.nodes = {f"node_{i}": {} for i in range(num_nodes)}

def distribute(self, sectors: Dict[str, dict]) -> Dict[str, Dict]:
    """Distribute sectors across network nodes"""
    distribution = {}
    
    for sector_id, sector_data in sectors.items():
        # Use Pi digit to determine primary and backup nodes
        sector_num = int(sector_id.split('_')[-1].replace('.dat', ''))
        pi_digit = sector_num % 10
        
        primary_node = f"node_{pi_digit % self.num_nodes}"
        backup_node = f"node_{(pi_digit + 1) % self.num_nodes}"
        
        # Store on primary node
        self.nodes[primary_node][sector_id] = {
            'data': sector_data['data'],
            'type': 'primary'
        }
        
        # Store backup on secondary node
        self.nodes[backup_node][sector_id] = {
            'data': sector_data['data'],
            'type': 'backup'
        }
        
        distribution[sector_id] = {
            'primary': primary_node,
            'backup': backup_node
        }
    
    return distribution

def simulate_node_failure(self, failed_node: str) -> int:
    """Simulate a node going offline"""
    if failed_node in self.nodes:
        lost_sectors = len(self.nodes[failed_node])
        self.nodes[failed_node] = {}
        return lost_sectors
    return 0

def recover_from_backup(self, lost_sectors: List[str]) -> Dict[str, dict]:
    """Recover lost sectors from backup nodes"""
    recovered = {}
    
    for sector_id in lost_sectors:
        for node_name, node_data in self.nodes.items():
            if sector_id in node_data:
                if node_data[sector_id]['type'] == 'backup':
                    # Promote backup to primary
                    node_data[sector_id]['type'] = 'primary'
                    recovered[sector_id] = node_data[sector_id]['data']
                    break
    
    return recovered

def get_all_sectors(self) -> Dict[str, dict]:
    """Get all sectors from all nodes"""
    all_sectors = {}
    for node_name, node_data in self.nodes.items():
        for sector_id, sector_info in node_data.items():
            if sector_info['type'] == 'primary':
                all_sectors[sector_id] = {
                    'data': sector_info['data'],
                    'node': node_name
                }
    return all_sectors

class PiRAPFileSystem:
“”“Main Pi-RAP File System”“”

def __init__(self, config: dict = None):
    self.config = config or {
        'sector_size': 50000,  # bits
        'compression': True,
        'encryption': True,
        'error_correction': True,
        'encryption_seed': 0,
        'network_nodes': 5,
        'redundancy_level': 2,
        'verbose': True
    }
    
    # Initialize components
    self.pi_gen = PiDigitGenerator()
    self.cipher = PiStreamCipher(self.pi_gen, self.config.get('encryption_seed', 0))
    self.rs_codec = ReedSolomonCodec(nsym=10 if self.config.get('error_correction') else 0)
    self.network = NetworkDistributor(self.config.get('network_nodes', 5))
    
    self.byte_sector_size = self.config.get('sector_size', 50000) // 8
    self.storage_map = {}
    self.file_metadata = {}
    self.distribution_map = {}
    
    # Statistics
    self.stats = {
        'original_size': 0,
        'compressed_size': 0,
        'encrypted_size': 0,
        'total_sectors': 0,
        'compression_ratio': 0,
        'processing_time': 0
    }

def log(self, message: str, level: str = "INFO"):
    """Log message with timestamp"""
    if self.config.get('verbose', True):
        timestamp = datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] [{level}] {message}")

def progress_bar(self, current: int, total: int, prefix: str = "", length: int = 40):
    """Display progress bar"""
    if not self.config.get('verbose', True):
        return
    
    percent = 100 * (current / total) if total > 0 else 0
    filled = int(length * current // total)
    bar = '█' * filled + '░' * (length - filled)
    sys.stdout.write(f'\r{prefix} |{bar}| {percent:.1f}% ({current}/{total})')
    sys.stdout.flush()
    if current >= total:
        print()

def calculate_checksum(self, data: bytes) -> str:
    """Calculate SHA-256 checksum"""
    return hashlib.sha256(data).hexdigest()

def compress_data(self, data: bytes) -> bytes:
    """Compress data using zlib"""
    if not self.config.get('compression', True):
        return data
    
    compressed = zlib.compress(data, level=9)
    compression_ratio = len(compressed) / len(data) if len(data) > 0 else 1
    self.stats['compression_ratio'] = compression_ratio
    return compressed

def decompress_data(self, data: bytes) -> bytes:
    """Decompress data"""
    if not self.config.get('compression', True):
        return data
    
    try:
        return zlib.decompress(data)
    except:
        return data

def read_file(self, filepath: str) -> Tuple[bytes, str]:
    """Read file and calculate checksum"""
    self.log(f"Reading file: {filepath}")
    
    with open(filepath, 'rb') as f:
        data = f.read()
    
    checksum = self.calculate_checksum(data)
    file_size = len(data)
    
    self.stats['original_size'] = file_size
    
    self.log(f"File size: {file_size:,} bytes ({file_size*8:,} bits)")
    self.log(f"SHA-256: {checksum[:32]}...")
    
    return data, checksum

def create_sectors(self, data: bytes, filename: str) -> List[bytes]:
    """Break data into sectors"""
    self.log(f"Creating sectors ({self.byte_sector_size:,} bytes each)...")
    
    # Pad data if needed
    original_len = len(data)
    if len(data) % self.byte_sector_size != 0:
        padding_len = self.byte_sector_size - (len(data) % self.byte_sector_size)
        data = data + b'\x00' * padding_len
    
    # Split into sectors
    sectors = []
    total_sectors = len(data) // self.byte_sector_size
    
    for i in range(total_sectors):
        start = i * self.byte_sector_size
        end = start + self.byte_sector_size
        sector_data = data[start:end]
        sectors.append(sector_data)
        
        if self.config.get('verbose', True):
            self.progress_bar(i + 1, total_sectors, "Sectorizing")
    
    self.stats['total_sectors'] = len(sectors)
    
    # Store metadata
    self.file_metadata = {
        'filename': filename,
        'original_size': original_len,
        'padded_size': len(data),
        'num_sectors': len(sectors),
        'sector_size': self.byte_sector_size,
        'timestamp': datetime.now().isoformat(),
        'config': self.config
    }
    
    self.log(f"✓ Created {len(sectors)} sectors")
    
    return sectors

def encrypt_and_encode_sectors(self, sectors: List[bytes]) -> Dict[str, dict]:
    """Encrypt sectors and add error correction"""
    self.log("Encrypting and adding error correction...")
    
    encrypted_sectors = {}
    total = len(sectors)
    
    for idx, sector in enumerate(sectors):
        # Encrypt
        if self.config.get('encryption', True):
            encrypted = self.cipher.encrypt(sector)
        else:
            encrypted = sector
        
        # Add error correction
        if self.config.get('error_correction', True):
            encoded = self.rs_codec.encode(encrypted)
        else:
            encoded = encrypted
        
        # Determine storage location using Pi
        pi_digit = self.pi_gen.get_digit(idx)
        disk_id = f"Disk_{pi_digit}"
        sector_address = f"{disk_id}/sector_{idx:06d}.dat"
        
        encrypted_sectors[sector_address] = {
            'data': encoded,
            'original_index': idx,
            'pi_digit': pi_digit,
            'disk_id': disk_id,
            'size': len(encoded)
        }
        
        if self.config.get('verbose', True):
            self.progress_bar(idx + 1, total, "Encrypting")
    
    self.log(f"✓ Encoded {len(encrypted_sectors)} sectors")
    
    return encrypted_sectors

def scatter_sectors(self, sectors: Dict[str, dict]):
    """Distribute sectors across network"""
    self.log(f"Distributing across {self.config.get('network_nodes', 5)} nodes...")
    
    # Network distribution
    self.distribution_map = self.network.distribute(sectors)
    
    # Also store locally
    self.storage_map = sectors
    
    self.log(f"✓ Distributed to {len(self.distribution_map)} locations")
    
    # Show distribution stats
    node_counts = {}
    for sector_id, dist_info in self.distribution_map.items():
        primary = dist_info['primary']
        node_counts[primary] = node_counts.get(primary, 0) + 1
    
    self.log("Distribution across nodes:")
    for node, count in sorted(node_counts.items()):
        self.log(f"  {node}: {count} sectors")

def simulate_data_loss(self, loss_percentage: float = 10) -> Tuple[Dict, List]:
    """Simulate data loss for testing recovery"""
    self.log(f"⚠️  Simulating {loss_percentage}% data loss...")
    
    all_sectors = self.network.get_all_sectors()
    total_sectors = len(all_sectors)
    num_lost = int(total_sectors * loss_percentage / 100)
    
    # Randomly select sectors to lose
    lost_keys = random.sample(list(all_sectors.keys()), num_lost)
    
    # Remove from network
    for sector_id in lost_keys:
        for node_name in self.network.nodes:
            if sector_id in self.network.nodes[node_name]:
                del self.network.nodes[node_name][sector_id]
    
    remaining = self.network.get_all_sectors()
    
    self.log(f"   Lost: {len(lost_keys)} sectors")
    self.log(f"   Remaining: {len(remaining)} sectors")
    
    return remaining, lost_keys

def recover_and_reconstruct(self, available_sectors: Dict = None) -> bytes:
    """Recover from backups and reconstruct file"""
    self.log("🔨 Recovering and reconstructing...")
    
    if available_sectors is None:
        available_sectors = self.network.get_all_sectors()
    
    # Check for missing sectors
    total_expected = self.file_metadata.get('num_sectors', 0)
    available_count = len(available_sectors)
    missing_count = total_expected - available_count
    
    if missing_count > 0:
        self.log(f"   Missing {missing_count} sectors, attempting recovery...")
        
        # Try to recover from backups
        all_sectors = {}
        for node_name, node_data in self.network.nodes.items():
            for sector_id, sector_info in node_data.items():
                if sector_id not in all_sectors:
                    all_sectors[sector_id] = sector_info
        
        available_sectors = all_sectors
    
    # Sort sectors by index
    sorted_sectors = sorted(
        available_sectors.items(),
        key=lambda x: x[1].get('original_index', 0)
    )
    
    # Reconstruct data
    reconstructed_data = b''
    
    for sector_id, sector_info in sorted_sectors:
        sector_data = sector_info['data']
        
        # Remove error correction
        if self.config.get('error_correction', True):
            decoded = self.rs_codec.decode(sector_data)
        else:
            decoded = sector_data
        
        # Decrypt
        if self.config.get('encryption', True):
            decrypted = self.cipher.decrypt(decoded)
        else:
            decrypted = decoded
        
        reconstructed_data += decrypted
    
    # Remove padding
    original_size = self.file_metadata.get('original_size', len(reconstructed_data))
    reconstructed_data = reconstructed_data[:original_size]
    
    self.log(f"✓ Reconstructed {len(reconstructed_data):,} bytes")
    
    return reconstructed_data

def verify_integrity(self, data: bytes, original_checksum: str) -> bool:
    """Verify reconstructed data matches original"""
    self.log("✅ Verifying integrity...")
    
    reconstructed_checksum = self.calculate_checksum(data)
    
    self.log(f"   Original:      {original_checksum}")
    self.log(f"   Reconstructed: {reconstructed_checksum}")
    
    if reconstructed_checksum == original_checksum:
        self.log("🎉 SUCCESS! Checksum MATCH!")
        return True
    else:
        self.log("❌ FAILURE! Checksum mismatch!")
        return False

def save_file(self, data: bytes, output_path: str):
    """Save reconstructed file"""
    self.log(f"💾 Saving to: {output_path}")
    
    with open(output_path, 'wb') as f:
        f.write(data)
    
    file_size = os.path.getsize(output_path)
    self.log(f"✓ Saved {file_size:,} bytes")

def process_file(self, input_path: str, output_path: str = None, 
                 simulate_loss: float = 0) -> bool:
    """Complete pipeline: Read → Process → Scatter → Recover → Verify"""
    
    start_time = time.time()
    
    if output_path is None:
        output_path = "reconstructed_" + os.path.basename(input_path)
    
    print("\n" + "="*70)
    print("🥧 Pi-RAP FILE SYSTEM v2.0 - COMPLETE EDITION")
    print("="*70)
    print(f"Input:  {input_path}")
    print(f"Output: {output_path}")
    print(f"Config: Compression={self.config.get('compression')}, " +
          f"Encryption={self.config.get('encryption')}, " +
          f"Error Correction={self.config.get('error_correction')}")
    print("="*70)
    
    try:
        # Step 1: Read file
        original_data, checksum = self.read_file(input_path)
        
        # Step 2: Compress
        if self.config.get('compression', True):
            self.log("Compressing data...")
            compressed_data = self.compress_data(original_data)
            self.stats['compressed_size'] = len(compressed_data)
            self.log(f"✓ Compressed: {len(original_data):,} → {len(compressed_data):,} bytes " +
                    f"({self.stats['compression_ratio']*100:.1f}%)")
        else:
            compressed_data = original_data
        
        # Step 3: Create sectors
        sectors = self.create_sectors(compressed_data, input_path)
        
        # Step 4: Encrypt and encode
        encrypted_sectors = self.encrypt_and_encode_sectors(sectors)
        self.stats['encrypted_size'] = sum(s['size'] for s in encrypted_sectors.values())
        
        # Step 5: Scatter across network
        self.scatter_sectors(encrypted_sectors)
        
        # Step 6: Simulate data loss (optional)
        if simulate_loss > 0:
            available_sectors, lost = self.simulate_data_loss(simulate_loss)
        else:
            available_sectors = self.network.get_all_sectors()
        
        # Step 7: Recover and reconstruct
        reconstructed_compressed = self.recover_and_reconstruct(available_sectors)
        
        # Step 8: Decompress
        if self.config.get('compression', True):
            self.log("Decompressing...")
            reconstructed_data = self.decompress_data(reconstructed_compressed)
        else:
            reconstructed_data = reconstructed_compressed
        
        # Step 9: Verify
        success = self.verify_integrity(reconstructed_data, checksum)
        
        # Step 10: Save
        if success:
            self.save_file(reconstructed_data, output_path)
        
        # Calculate statistics
        self.stats['processing_time'] = time.time() - start_time
        
        # Print summary
        print("\n" + "="*70)
        print("📊 PROCESSING SUMMARY")
        print("="*70)
        print(f"Original size:     {self.stats['original_size']:,} bytes")
        print(f"Compressed size:   {self.stats['compressed_size']:,} bytes")
        print(f"Total sectors:     {self.stats['total_sectors']:,}")
        print(f"Compression ratio: {self.stats['compression_ratio']*100:.2f}%")
        print(f"Processing time:   {self.stats['processing_time']:.3f} seconds")
        if self.stats['processing_time'] > 0:
            throughput = self.stats['original_size'] / self.stats['processing_time'] / 1024 / 1024
            print(f"Throughput:        {throughput:.2f} MB/s")
        print("="*70)
        
        if success:
            print("✅ PROCESSING COMPLETE - SUCCESS!")
        else:
            print("❌ PROCESSING COMPLETE - FAILED!")
        print("="*70 + "\n")
        
        return success
        
    except Exception as e:
        self.log(f"ERROR: {str(e)}", "ERROR")
        import traceback
        traceback.print_exc()
        return False

def create_test_file(filepath: str, size_kb: int = 100):
“”“Create a test file with random data”“”
print(f"Creating test file: {filepath} ({size_kb} KB)")

# Create JPEG-like header
data = bytearray()
data.extend(b'\xFF\xD8\xFF\xE0\x00\x10JFIF')  # JPEG header
data.extend(b'\x00' * (size_kb * 1024 - 20))  # Fill with zeros
data.extend(b'\xFF\xD9')  # JPEG footer

with open(filepath, 'wb') as f:
    f.write(data)

print(f"✓ Test file created")

def main():
“”“Main entry point with CLI”“”
import argparse

parser = argparse.ArgumentParser(
    description='Pi-RAP: Pi-based Reconstruction & Archival Protocol',
    formatter_class=argparse.RawDescriptionHelpFormatter,
    epilog="""

Examples:
python pi_rap_complete.py -i myfile.jpg
python pi_rap_complete.py -i myfile.jpg --loss 15
python pi_rap_complete.py -i myfile.jpg --no-compression --no-encryption
python pi_rap_complete.py --test
“”"
)

parser.add_argument('-i', '--input', help='Input file path')
parser.add_argument('-o', '--output', help='Output file path')
parser.add_argument('--loss', type=float, default=0, 
                   help='Simulate data loss percentage (0-100)')
parser.add_argument('--sector-size', type=int, default=50000,
                   help='Sector size in bits (default: 50000)')
parser.add_argument('--nodes', type=int, default=5,
                   help='Number of network nodes (default: 5)')
parser.add_argument('--seed', type=int, default=0,
                   help='Encryption seed (default: 0)')
parser.add_argument('--no-compression', action='store_true',
                   help='Disable compression')
parser.add_argument('--no-encryption', action='store_true',
                   help='Disable encryption')
parser.add_argument('--no-error-correction', action='store_true',
                   help='Disable error correction')
parser.add_argument('--test', action='store_true',
                   help='Run with test file')
parser.add_argument('--quiet', action='store_true',
                   help='Suppress verbose output')

args = parser.parse_args()

# Create configuration
config = {
    'sector_size': args.sector_size,
    'compression': not args.no_compression,
    'encryption': not args.no_encryption,
    'error_correction': not args.no_error_correction,
    'network_nodes': args.nodes,
    'encryption_seed': args.seed,
    'verbose': not args.quiet
}

# Initialize system
pirap = PiRAPFileSystem(config)

# Test mode
if args.test:
    test_file = "test_image.jpg"
    create_test_file(test_file, size_kb=500)
    args.input = test_file
    if not args.output:
        args.output = "recovered_test.jpg"

# Process file
if args.input:
    success = pirap.process_file(
        input_path=args.input,
        output_path=args.output,
        simulate_loss=args.loss
    )
    sys.exit(0 if success else 1)
else:
    parser.print_help()
    sys.exit(1)

if name == “main”:
main()

1 Like

You’ll get more help if you enclose your code in three backticks on a line by itself. Your code looks like Python, indentation is critical, and few have time to reindent your program on their PC to try and run it. The backtick on a US keyboard is in the upper left of the keyboard.


```
this is code
```

1 Like