Reference files
main.py
import logging import json import io import base64 import time import getpass from c2pa import Builder, create_signer, SigningAlg from dtmCSC import sign1, get_credentials_info # CSC API # --- Config --- CERT_CHAIN_PATH = "chain.pem" SIGNING_ALG = SigningAlg.PS256 TIMESTAMP_URL = "http://timestamp.digicert.com" # Load certificate chain with open(CERT_CHAIN_PATH, "rb") as f: CERT_CHAIN = f.read() def log_step(msg: str): print("\n" + "=" * 60 + "\n") logging.info(f"{msg}") time.sleep(1) def main(image_path: str, output_path: str): logging.basicConfig(level=logging.INFO, format="%(message)s") # --- Step 1: Ask for PIN (hidden input) --- log_step("Step 1: π Enter PIN") pin = getpass.getpass("Enter PIN (hidden): ").strip() if not pin: raise ValueError("PIN cannot be empty") # --- Step 2: Fetch author info from CSC credentials API --- log_step("Step 2: π‘ Fetching author info from CSC") author_name, author_email = "", "" try: creds_info, author_from_cert, email_from_api = get_credentials_info(pin) if author_from_cert: author_name = author_from_cert.strip() if email_from_api: author_email = email_from_api.strip() except Exception as e: logging.error("β Failed to get credentials info", exc_info=e) # Fallbacks if not author_name: author_name = input("Author name not found from CSC. Enter author name to use: ").strip() if not author_email: author_email = "" logging.info(f"β Using author: {author_name} ({author_email})") # --- Step 3: Build manifest --- log_step("Step 3: ποΈ Building manifest") manifest_dict = { "title": image_path, "format": "image/jpeg", "claim_generator_info": [ {"name": "DigiCert Content Trust Manager", "version": "1.0.0"} ], "assertions": [ { "label": "stds.schema-org.CreativeWork", "data": { "@context": "https://schema.org", "@type": "CreativeWork", "author": [ {"@type": "Person", "name": author_name, "email": author_email} ], }, "kind": "Json", }, { "label": "c2pa.actions", "data": { "actions": [ { "action": "c2pa.created", "softwareAgent": { "name": "DigiCert Content Trust Manager", "version": "1.0.0", }, }, { "action": "c2pa.signed", "softwareAgent": { "name": "DigiCert Content Trust Manager", "version": "1.0.0", }, }, ] }, "kind": "Json", }, ], } manifest_json = json.dumps(manifest_dict) try: # --- Step 4: Load image --- log_step("Step 4: πΌοΈ Loading input image") with open(image_path, "rb") as f: img_bytes = f.read() builder = Builder(manifest_json) # --- Step 5: CSC signer callback --- log_step("Step 5: βοΈ Preparing CSC signer") def csc_signer(data: bytes, **kwargs): hash_b64 = base64.b64encode(data).decode("utf-8") logging.info(f"π§Ύ Hash prepared for CSC: {hash_b64}") sad_response = sign1(data, pin) if isinstance(sad_response, str): return sad_response.encode("utf-8") elif isinstance(sad_response, dict) and "signedHash" in sad_response: return sad_response["signedHash"].encode("utf-8") else: raise ValueError(f"Unexpected SAD response: {sad_response}") signer = create_signer(csc_signer, SIGNING_ALG, CERT_CHAIN, TIMESTAMP_URL) # --- Step 6: Sign image --- log_step("Step 6: ποΈ Signing image") result = io.BytesIO() builder.sign(signer, "image/jpeg", io.BytesIO(img_bytes), result) # --- Step 7: Write signed output --- log_step("Step 7: πΎ Writing signed output") with open(output_path, "wb") as f: f.write(result.getvalue()) logging.info(f"β Signed file successfully written to {output_path}") except Exception as e: logging.error("β Signing failed", exc_info=e) print(f"β Error: {e}") if __name__ == "__main__": import sys if len(sys.argv) != 3: print("Usage: python main1.py <input_image> <output_image>") else: main(sys.argv[1], sys.argv[2])
dtmCSC.py
import base64 import hashlib import requests import logging from cryptography import x509 from cryptography.x509.oid import NameOID import time # Configure logging logging.basicConfig(level=logging.INFO) CSC_BASE_URL = "https://clientauth.demo.one.digicert.com/documentmanager/csc/v1" CLIENT_CERT = "cert.pem" CLIENT_KEY = "key.pem" CREDENTIAL_ID = "basic_np-14-08-2025-11-01-44-165" # Replace with your real credential ID def extract_author_from_cert(cert_b64: str) -> str | None: """ Extracts Common Name (CN) from a base64 DER certificate string. """ try: cert_der = base64.b64decode(cert_b64) cert = x509.load_der_x509_certificate(cert_der) cn_attr = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) if cn_attr: return cn_attr[0].value except Exception as e: logging.error("β Failed to extract CN from certificate", exc_info=e) return None def get_credentials_info(pin: str): """ Fetches author details from the DigiCert CSC credentials info API. Returns a tuple: (raw_data, author_name, author_email). """ try: print("\n" + "=" * 60 + "\n") logging.info("π‘ Requesting credentials info from CSC API...") time.sleep(1) payload = {"PIN": pin, "credentialID": CREDENTIAL_ID} url = f"{CSC_BASE_URL}/credentials/info" resp = requests.post( url, json=payload, cert=(CLIENT_CERT, CLIENT_KEY), headers={"Content-Type": "application/json"}, timeout=10 ) resp.raise_for_status() data = resp.json() # Try extracting from certificate certs = data.get("cert", {}).get("certificates", []) author_name = None if certs: author_name = extract_author_from_cert(certs[0]) # CSC usually doesnβt return email β keep blank author_email = "" logging.info("β Credentials info successfully retrieved") time.sleep(1) return data, author_name, author_email except Exception as e: logging.error("β Failed to fetch credentials info", exc_info=e) return {}, None, "" def get_sad(hash_b64: str, pin: str) -> str: """ Request a Signature Activation Data (SAD) token from CSC for signing a hash. """ print("\n" + "=" * 60 + "\n") logging.info("π‘ Requesting SAD from CSC API...") time.sleep(1) url = f"{CSC_BASE_URL}/credentials/authorize" payload = { "credentialID": CREDENTIAL_ID, "PIN": pin, "hash": [hash_b64], "hashAlgo": "SHA256", "numSignatures": 1 } resp = requests.post( url, json=payload, cert=(CLIENT_CERT, CLIENT_KEY), headers={"Content-Type": "application/json"}, timeout=10 ) if resp.status_code != 200: logging.error(f"β Failed to get SAD: {resp.status_code} {resp.text}") raise Exception(f"SAD request failed: {resp.text}") data = resp.json() logging.info("β SAD successfully received") time.sleep(1) return data["SAD"] def sign1(to_sign: bytes, pin: str, **kwargs) -> str: """ Sign the input data using CSC by requesting SAD for its SHA256 hash. Returns the SAD token as string. """ print("\n" + "=" * 60 + "\n") logging.info("π Preparing data for CSC signing...") time.sleep(1) digest = hashlib.sha256(to_sign).digest() hash_b64 = base64.b64encode(digest).decode("utf-8") logging.info(f"π§Ύ Hash sent to CSC (Base64): {hash_b64}") time.sleep(1) sad = get_sad(hash_b64, pin) logging.info("β Data signed by CSC") time.sleep(1) return sad
requirements.txt
c2pa-python==0.6.1
cryptography
requests