FEATURE: Implement major enhancements for performance and usability
All checks were successful
Publish Python Package to PyPI / deploy (push) Successful in 15s

This commit introduces several key improvements:

- Parallel Processing: Utilizes `concurrent.futures.ThreadPoolExecutor`
  to process multiple video files simultaneously, significantly
  speeding up batch operations. A new `--jobs` flag allows
  customization of worker threads.

- Progress Bar: Integrates `tqdm` to display a real-time progress
  bar, providing users with feedback on the status, ETA, and
  speed of the transcoding process.

- Dry Run Mode: Adds a `--dry-run` command-line flag. When active,
  the script analyzes files and reports intended actions (transcode,
  copy, skip) without making any actual changes to the files,
  allowing users to preview operations.

- Graceful Exit (Ctrl+C): Implements robust handling of
  KeyboardInterrupt. FFmpeg now writes to temporary files (`.tmp`),
  which are only renamed upon successful completion. If the process
  is interrupted or an error occurs, these temporary files are
  automatically cleaned up, preventing corrupt or partial output.

- Dependencies: Adds `tqdm` to `install_requires` in `setup.cfg`.
- Version: Bumps project version to 0.3.0 to reflect these
  significant feature additions.

FIX: Actually Skip video files where no transcoding is needed.
This commit is contained in:
2025-06-05 21:18:30 -04:00
parent c233629cb9
commit 5611d76e69
3 changed files with 274 additions and 171 deletions

View File

@ -1,18 +1,28 @@
import subprocess
import concurrent.futures
import os
import shutil
import argparse
import json
import threading
from functools import partial
from tqdm import tqdm
def get_stream_info(filepath: str, stream_type: str = "audio") -> list[dict]:
# Global lock for TQDM writes to prevent interleaving from multiple threads
tqdm_lock = threading.Lock()
SUPPORTED_EXTENSIONS = (".mkv", ".mp4")
def get_stream_info(filepath: str, stream_type: str = "audio") -> tuple[list[dict], list[str]]:
"""
Retrieves details for specified stream types (audio, video, subtitle) in a file.
For audio, returns list of dicts with 'index', 'codec_name', 'channels', 'language'.
For video/subtitle, returns list of dicts with 'index', 'codec_name'.
"""
logs = []
if not shutil.which("ffprobe"):
print(f" ⚠️ Warning: ffprobe is missing. Cannot get {stream_type} stream info for '{os.path.basename(filepath)}'.")
return []
logs.append(f" ⚠️ Warning: ffprobe is missing. Cannot get {stream_type} stream info for '{os.path.basename(filepath)}'.")
return [], logs
select_streams_option = {
"audio": "a",
@ -32,9 +42,9 @@ def get_stream_info(filepath: str, stream_type: str = "audio") -> list[dict]:
)
if process.returncode != 0:
# Non-critical error for this function, main processing will decide to skip/fail
return []
return [], logs
if not process.stdout.strip():
return [] # No streams of the selected type found
return [], logs # No streams of the selected type found
data = json.loads(process.stdout)
streams_details = []
@ -49,43 +59,34 @@ def get_stream_info(filepath: str, stream_type: str = "audio") -> list[dict]:
streams_details.append(detail)
return streams_details
except json.JSONDecodeError:
print(f" ⚠️ Warning: Failed to decode ffprobe JSON for {stream_type} streams in '{os.path.basename(filepath)}'.")
return []
logs.append(f" ⚠️ Warning: Failed to decode ffprobe JSON for {stream_type} streams in '{os.path.basename(filepath)}'.")
return [], logs
except Exception as e:
print(f" ⚠️ Error getting {stream_type} stream info for '{os.path.basename(filepath)}': {e}")
return []
logs.append(f" ⚠️ Error getting {stream_type} stream info for '{os.path.basename(filepath)}': {e}")
return [], logs
def process_file_with_ffmpeg(
input_filepath: str,
output_dir_for_file: str | None,
final_output_filepath: str | None,
audio_bitrate: str,
audio_processing_ops: list[dict] # [{'index':X, 'op':'transcode'/'copy', 'lang':'eng'}]
) -> str | None:
) -> tuple[bool, list[str]]:
"""
Processes a single video file using ffmpeg with detailed stream mapping.
Processes a single video file using ffmpeg, writing to a temporary file first.
"""
logs = []
if not shutil.which("ffmpeg"):
print(" 🚨 Error: ffmpeg is not installed or not found.") # Should be caught earlier too
return None
logs.append(" 🚨 Error: ffmpeg is not installed or not found.")
return False, logs
base_filename = os.path.basename(input_filepath)
name, ext = os.path.splitext(base_filename)
output_filename = f"{name}_eac3{ext}" # Suffix remains as per original request
output_filename = f"{name}_eac3{ext}"
if output_dir_for_file:
if not os.path.isdir(output_dir_for_file):
try:
os.makedirs(output_dir_for_file, exist_ok=True)
except OSError as e:
print(f" 🚨 Error creating output directory '{output_dir_for_file}': {e}")
return None
final_output_filepath = os.path.join(output_dir_for_file, output_filename)
else:
final_output_filepath = os.path.join(os.path.dirname(input_filepath), output_filename)
if os.path.abspath(input_filepath) == os.path.abspath(final_output_filepath):
print(f" ⚠️ Warning: Input and output file paths are identical ('{input_filepath}'). Skipping.")
return None
# FFMpeg will write to a temporary file, which we will rename upon success
temp_output_filepath = final_output_filepath + ".tmp"
base_filename = os.path.basename(input_filepath)
output_filename = os.path.basename(final_output_filepath)
ffmpeg_cmd = ["ffmpeg", "-i", input_filepath]
map_operations = []
@ -98,25 +99,17 @@ def process_file_with_ffmpeg(
# Map Audio Streams based on operations
for op_details in audio_processing_ops:
input_stream_map_specifier = f"0:{op_details['index']}" # Map by original ffprobe index
map_operations.extend(["-map", input_stream_map_specifier])
map_operations.extend(["-map", f"0:{op_details['index']}"])
if op_details['op'] == 'transcode':
map_operations.extend([f"-c:a:{output_audio_stream_ffmpeg_idx}", "eac3"])
map_operations.extend([f"-b:a:{output_audio_stream_ffmpeg_idx}", audio_bitrate])
map_operations.extend([f"-ac:a:{output_audio_stream_ffmpeg_idx}", "6"])
map_operations.extend([f"-metadata:s:a:{output_audio_stream_ffmpeg_idx}", f"language={op_details['lang']}"])
map_operations.extend([f"-c:a:{output_audio_stream_ffmpeg_idx}", "eac3", f"-b:a:{output_audio_stream_ffmpeg_idx}", audio_bitrate, f"-ac:a:{output_audio_stream_ffmpeg_idx}", "6", f"-metadata:s:a:{output_audio_stream_ffmpeg_idx}", f"language={op_details['lang']}"])
elif op_details['op'] == 'copy':
map_operations.extend([f"-c:a:{output_audio_stream_ffmpeg_idx}", "copy"])
# 'drop' operations are handled by not including them in audio_processing_ops sent here
output_audio_stream_ffmpeg_idx += 1
ffmpeg_cmd.extend(map_operations)
ffmpeg_cmd.extend(["-y", final_output_filepath])
ffmpeg_cmd.extend(["-y", temp_output_filepath])
# print(f" Executing: {' '.join(ffmpeg_cmd)}") # For debugging complex commands
print(f" ⚙️ Processing: '{base_filename}' -> '{output_filename}'")
logs.append(f" ⚙️ Processing: '{base_filename}' -> '{output_filename}'")
try:
process = subprocess.run(
@ -124,24 +117,146 @@ def process_file_with_ffmpeg(
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
)
if process.returncode == 0:
if os.path.exists(final_output_filepath) and os.path.getsize(final_output_filepath) > 0:
print(f" ✅ Success: '{os.path.basename(final_output_filepath)}' saved.")
return final_output_filepath
if os.path.exists(temp_output_filepath) and os.path.getsize(temp_output_filepath) > 0:
os.rename(temp_output_filepath, final_output_filepath) # Atomic rename on success
logs.append(f" ✅ Success: '{output_filename}' saved.")
return True, logs
else: # Should not happen if ffmpeg returncode is 0 and no "-f null" output.
print(f" ⚠️ Warning: ffmpeg reported success for '{base_filename}', but output file is missing or empty.")
if process.stderr: print(f" ffmpeg stderr:\n{process.stderr}")
return None
if process.stderr: logs.append(f" ffmpeg stderr:\n{process.stderr.strip()}")
return False, logs
else:
print(f" 🚨 Error during ffmpeg processing for '{base_filename}'. RC: {process.returncode}")
# if process.stdout: print(f" ffmpeg stdout:\n{process.stdout}") # Usually not much on error
if process.stderr: print(f" ffmpeg stderr:\n{process.stderr.strip()}")
if os.path.exists(final_output_filepath):
try: os.remove(final_output_filepath)
except OSError: pass
return None
logs.append(f" 🚨 Error during ffmpeg processing for '{base_filename}'. RC: {process.returncode}")
if process.stderr: logs.append(f" ffmpeg stderr:\n{process.stderr.strip()}")
return False, logs
except Exception as e:
print(f" 🚨 An unexpected error occurred during transcoding of '{base_filename}': {e}")
return None
logs.append(f" 🚨 An unexpected error occurred during transcoding of '{base_filename}': {e}")
return False, logs
def process_single_file(filepath: str, args: argparse.Namespace, input_path_abs: str) -> str:
"""
Analyzes and processes a single file, managing temporary files for graceful exit.
"""
file_specific_logs = []
# Determine a display name relative to the initial input path for cleaner logs
if os.path.isdir(input_path_abs):
display_name = os.path.relpath(filepath, input_path_abs)
else:
display_name = os.path.basename(filepath)
file_specific_logs.append(f"▶️ Checking: '{display_name}'")
target_languages = [lang.strip().lower() for lang in args.languages.split(',') if lang.strip()]
audio_streams_details = get_stream_info(filepath, "audio")
audio_ops_for_ffmpeg = []
if not audio_streams_details:
file_specific_logs.append(" No audio streams found in this file.")
else:
for stream in audio_streams_details:
lang = stream['language']
op_to_perform = None
channels_info = f"{stream.get('channels')}ch" if stream.get('channels') is not None else "N/Ach"
codec_name = stream.get('codec_name', 'unknown')
if lang in target_languages:
is_5_1 = stream.get('channels') == 6
is_not_ac3_eac3 = codec_name not in ['ac3', 'eac3']
if is_5_1 and is_not_ac3_eac3:
op_to_perform = 'transcode'
file_specific_logs.append(f" 🔈 Will transcode: Audio stream #{stream['index']} ({lang}, {channels_info}, {codec_name})")
else:
op_to_perform = 'copy'
reason_parts = [f"already {codec_name}" if codec_name in ['ac3', 'eac3'] else None, f"not 5.1 ({channels_info})" if stream.get('channels') != 6 else None]
reason = ", ".join(filter(None, reason_parts)) or "meets other criteria for copying"
file_specific_logs.append(f" 🔈 Will copy: Audio stream #{stream['index']} ({lang}, {channels_info}, {codec_name}) - Reason: {reason}")
else:
file_specific_logs.append(f" 🔈 Will drop: Audio stream #{stream['index']} ({lang}, {channels_info}, {codec_name}) - Not a target language.")
if op_to_perform:
audio_ops_for_ffmpeg.append({'index': stream['index'], 'op': op_to_perform, 'lang': lang})
# First, check if there are any operations at all for target languages
if not audio_ops_for_ffmpeg:
file_specific_logs.append(f" ⏭️ Skipping '{display_name}': No target audio streams to process (copy/transcode).")
with tqdm_lock:
for log_msg in file_specific_logs:
tqdm.write(log_msg)
return "skipped_no_ops"
needs_transcode = any(op['op'] == 'transcode' for op in audio_ops_for_ffmpeg)
if not needs_transcode:
file_specific_logs.append(f" ⏭️ Skipping '{display_name}': All target audio operations are 'copy'; no transcoding required.")
with tqdm_lock:
for log_msg in file_specific_logs:
tqdm.write(log_msg)
return "skipped_no_transcode"
# Determine final output path
name, ext = os.path.splitext(os.path.basename(filepath))
output_filename = f"{name}_eac3{ext}"
output_dir_for_this_file = os.path.dirname(filepath) # Default to same directory
if args.output_directory_base: # Input was a folder
if os.path.isdir(input_path_abs):
relative_dir = os.path.relpath(os.path.dirname(filepath), start=input_path_abs)
output_dir_for_this_file = os.path.join(args.output_directory_base, relative_dir) if relative_dir != "." else args.output_directory_base
else: # Input was a single file
output_dir_for_this_file = args.output_directory_base
final_output_filepath = os.path.join(output_dir_for_this_file, output_filename)
# Check for identical paths before starting
if os.path.abspath(filepath) == os.path.abspath(final_output_filepath):
file_specific_logs.append(f" ⚠️ Warning: Input and output file paths are identical ('{filepath}'). Skipping.")
with tqdm_lock:
for log_msg in file_specific_logs:
tqdm.write(log_msg)
return "skipped_identical_path"
if args.dry_run:
file_specific_logs.append(f" DRY RUN: Would process '{display_name}'. No changes will be made.")
with tqdm_lock:
for log_msg in file_specific_logs:
tqdm.write(log_msg)
# We return 'processed' to indicate it *would* have been processed
return "processed"
# Ensure output directory exists before processing
if not os.path.isdir(output_dir_for_this_file):
try:
os.makedirs(output_dir_for_this_file, exist_ok=True)
except OSError as e:
file_specific_logs.append(f" 🚨 Error creating output directory '{output_dir_for_this_file}': {e}")
with tqdm_lock:
for log_msg in file_specific_logs:
tqdm.write(log_msg)
return "failed"
temp_filepath = final_output_filepath + ".tmp"
final_status = "failed"
try:
success, ffmpeg_logs = process_file_with_ffmpeg(
filepath,
final_output_filepath,
args.audio_bitrate,
audio_ops_for_ffmpeg
)
file_specific_logs.extend(ffmpeg_logs)
return "processed" if success else "failed"
finally:
# This block will run whether the try block succeeded, failed, or was interrupted.
if os.path.exists(temp_filepath):
try:
os.remove(temp_filepath)
except OSError as e:
file_specific_logs.append(f" 🚨 Error cleaning up temp file '{temp_filepath}': {e}")
with tqdm_lock: # Print all logs for this file at the very end of its processing
for log_msg in file_specific_logs:
tqdm.write(log_msg)
return final_status
def main():
@ -183,9 +298,23 @@ def main():
dest="languages",
default="eng,jpn"
)
parser.add_argument(
"-j", "--jobs",
type=int,
default=os.cpu_count(), # Default to the number of CPU cores
help=f"Number of files to process in parallel. Defaults to the number of CPU cores on your system ({os.cpu_count()})."
)
parser.add_argument(
"--dry-run",
action="store_true", # Makes it a flag, e.g., --dry-run
help="Analyze files and report actions without executing ffmpeg."
)
args = parser.parse_args()
target_languages = [lang.strip().lower() for lang in args.languages.split(',') if lang.strip()]
if args.dry_run:
print("--- DRY RUN MODE ENABLED: No files will be modified. ---")
input_path_abs = os.path.abspath(args.input_path)
files_to_process_paths = []
@ -194,7 +323,7 @@ def main():
print(f"📁 Scanning folder: {input_path_abs}")
for root, _, filenames in os.walk(input_path_abs):
for filename in filenames:
if filename.lower().endswith((".mkv", ".mp4")):
if filename.lower().endswith(SUPPORTED_EXTENSIONS):
files_to_process_paths.append(os.path.join(root, filename))
if not files_to_process_paths:
print(" No .mkv or .mp4 files found in the specified folder.")
@ -213,94 +342,51 @@ def main():
print(f"\nFound {len(files_to_process_paths)} file(s) to potentially process...")
# Initialize stats counters
stats = {"processed": 0, "skipped_rules": 0, "failed": 0}
stats = {
"processed": 0,
"skipped_no_ops": 0,
"skipped_no_transcode": 0,
"skipped_identical_path": 0,
"failed": 0
}
for filepath in files_to_process_paths:
# Determine a display name relative to the initial input path for cleaner logs
if os.path.isdir(input_path_abs):
display_name = os.path.relpath(filepath, input_path_abs)
else: # Single file input
display_name = os.path.basename(filepath)
print(f"\n▶️ Checking: '{display_name}'")
try:
with tqdm(total=len(files_to_process_paths), desc="Overall Progress", unit="file", ncols=100, smoothing=0.1, leave=True) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.jobs) as executor:
future_to_path = {
executor.submit(partial(process_single_file, args=args, input_path_abs=input_path_abs), filepath): filepath
for filepath in files_to_process_paths
}
audio_streams_details = get_stream_info(filepath, "audio")
audio_ops_for_ffmpeg = [] # List of audio operations for ffmpeg
for future in concurrent.futures.as_completed(future_to_path):
path = future_to_path[future]
try:
status = future.result()
stats[status] += 1
except Exception as exc:
tqdm.write(f"🚨 An unexpected error occurred while processing '{os.path.basename(path)}': {exc}")
stats["failed"] += 1
finally:
pbar.update(1)
if not audio_streams_details:
print(" No audio streams found in this file.")
else:
for stream in audio_streams_details:
lang = stream['language']
op_to_perform = None # Will be 'transcode', 'copy', or None (for drop)
if lang in target_languages:
is_5_1 = stream.get('channels') == 6
is_not_ac3_eac3 = stream.get('codec_name') not in ['ac3', 'eac3']
if is_5_1 and is_not_ac3_eac3:
op_to_perform = 'transcode'
print(f" 🔈 Will transcode: Audio stream #{stream['index']} ({lang}, {stream.get('channels')}ch, {stream.get('codec_name')})")
else:
op_to_perform = 'copy'
reason_parts = []
if stream.get('codec_name') in ['ac3', 'eac3']: reason_parts.append(f"already {stream.get('codec_name')}")
if stream.get('channels') != 6: reason_parts.append(f"not 5.1 ({stream.get('channels')}ch)")
reason = ", ".join(reason_parts) if reason_parts else "meets other criteria for copying"
print(f" 🔈 Will copy: Audio stream #{stream['index']} ({lang}, {stream.get('channels')}ch, {stream.get('codec_name')}) - Reason: {reason}")
else:
# Language is not in the target list, so it will be dropped (no op_to_perform)
print(f" 🔈 Will drop: Audio stream #{stream['index']} ({lang}, {stream.get('channels')}ch, {stream.get('codec_name')}) - Other language.")
if op_to_perform:
audio_ops_for_ffmpeg.append({
'index': stream['index'],
'op': op_to_perform,
'lang': lang # Store for potential metadata setting during transcode
})
if not audio_ops_for_ffmpeg:
print(f" ⏭️ Skipping '{display_name}': No audio streams in the desired languages ({args.languages}) meet criteria for processing. File will not be created.")
stats["skipped_rules"] += 1
continue # Move to the next file in files_to_process_paths
# If we reach here, audio_ops_for_ffmpeg is NOT empty
# Determine the output directory for this specific file
output_dir_for_this_file = None
if args.output_directory_base:
if os.path.isdir(input_path_abs): # Input was a folder
# Replicate source structure from input_path_abs root into output_directory_base
relative_dir_of_file = os.path.relpath(os.path.dirname(filepath), start=input_path_abs)
if relative_dir_of_file == ".": # file is in the root of input_path_abs
output_dir_for_this_file = args.output_directory_base
else:
output_dir_for_this_file = os.path.join(args.output_directory_base, relative_dir_of_file)
else: # Input was a single file, output_directory_base is the direct output dir
output_dir_for_this_file = args.output_directory_base
# If args.output_directory_base is None, output_dir_for_this_file remains None,
# and process_file_with_ffmpeg will save the output alongside the original file.
processed_file_path = process_file_with_ffmpeg(
filepath,
output_dir_for_this_file,
args.audio_bitrate,
audio_ops_for_ffmpeg # This list is guaranteed to be non-empty here
)
if processed_file_path:
stats["processed"] += 1
else:
stats["failed"] += 1
# Detailed error message for the specific file would have been printed by process_file_with_ffmpeg
except KeyboardInterrupt:
print("\n\n🚨 Process interrupted by user. Shutting down gracefully... Any in-progress files have been cleaned up.")
# The 'finally' blocks in each thread will handle cleanup.
# Exiting here.
return
# Print summary of operations
print("\n--- Processing Summary ---")
summary_title = "--- Dry Run Summary ---" if args.dry_run else "--- Processing Summary ---"
processed_label = "Would be processed" if args.dry_run else "Successfully processed"
print(f"\n{summary_title}")
print(f"Total files checked: {len(files_to_process_paths)}")
print(f"Successfully processed: {stats['processed']}")
print(f"Skipped (no qualifying audio ops): {stats['skipped_rules']}")
print(f"Failed to process: {stats['failed']}")
print(f"{processed_label}: {stats['processed']}")
total_skipped = stats['skipped_no_ops'] + stats['skipped_no_transcode'] + stats['skipped_identical_path']
print(f"⏭️ Total Skipped: {total_skipped}")
if total_skipped > 0:
print(f" - No target audio operations: {stats['skipped_no_ops']}")
print(f" - No transcoding required (all copy): {stats['skipped_no_transcode']}")
print(f" - Identical input/output path: {stats['skipped_identical_path']}")
print(f"🚨 Failed to process: {stats['failed']}")
print("--------------------------")
if __name__ == "__main__":
main()