Skip to main content

Reference files

main.py

import logging
import json
import io
import base64
import time
import getpass
from c2pa import Builder, create_signer, SigningAlg
from dtmCSC import sign1, get_credentials_info  # CSC API

# --- Config ---
CERT_CHAIN_PATH = "chain.pem"
SIGNING_ALG = SigningAlg.PS256
TIMESTAMP_URL = "http://timestamp.digicert.com"

# Load certificate chain
with open(CERT_CHAIN_PATH, "rb") as f:
    CERT_CHAIN = f.read()


def log_step(msg: str):
    print("\n" + "=" * 60 + "\n")
    logging.info(f"{msg}")
    time.sleep(1)


def main(image_path: str, output_path: str):
    logging.basicConfig(level=logging.INFO, format="%(message)s")

    # --- Step 1: Ask for PIN (hidden input) ---
    log_step("Step 1: πŸ”’ Enter PIN")
    pin = getpass.getpass("Enter PIN (hidden): ").strip()
    if not pin:
        raise ValueError("PIN cannot be empty")

    # --- Step 2: Fetch author info from CSC credentials API ---
    log_step("Step 2: πŸ“‘ Fetching author info from CSC")
    author_name, author_email = "", ""
    try:
        creds_info, author_from_cert, email_from_api = get_credentials_info(pin)

        if author_from_cert:
            author_name = author_from_cert.strip()
        if email_from_api:
            author_email = email_from_api.strip()

    except Exception as e:
        logging.error("❌ Failed to get credentials info", exc_info=e)

    # Fallbacks
    if not author_name:
        author_name = input("Author name not found from CSC. Enter author name to use: ").strip()
    if not author_email:
        author_email = ""

    logging.info(f"βœ… Using author: {author_name} ({author_email})")

    # --- Step 3: Build manifest ---
    log_step("Step 3: πŸ—οΈ Building manifest")
    manifest_dict = {
        "title": image_path,
        "format": "image/jpeg",
        "claim_generator_info": [
            {"name": "DigiCert Content Trust Manager", "version": "1.0.0"}
        ],
        "assertions": [
            {
                "label": "stds.schema-org.CreativeWork",
                "data": {
                    "@context": "https://schema.org",
                    "@type": "CreativeWork",
                    "author": [
                        {"@type": "Person", "name": author_name, "email": author_email}
                    ],
                },
                "kind": "Json",
            },
            {
                "label": "c2pa.actions",
                "data": {
                    "actions": [
                        {
                            "action": "c2pa.created",
                            "softwareAgent": {
                                "name": "DigiCert Content Trust Manager",
                                "version": "1.0.0",
                            },
                        },
                        {
                            "action": "c2pa.signed",
                            "softwareAgent": {
                                "name": "DigiCert Content Trust Manager",
                                "version": "1.0.0",
                            },
                        },
                    ]
                },
                "kind": "Json",
            },
        ],
    }
    manifest_json = json.dumps(manifest_dict)

    try:
        # --- Step 4: Load image ---
        log_step("Step 4: πŸ–ΌοΈ Loading input image")
        with open(image_path, "rb") as f:
            img_bytes = f.read()

        builder = Builder(manifest_json)

        # --- Step 5: CSC signer callback ---
        log_step("Step 5: ✍️ Preparing CSC signer")

        def csc_signer(data: bytes, **kwargs):
            hash_b64 = base64.b64encode(data).decode("utf-8")
            logging.info(f"🧾 Hash prepared for CSC: {hash_b64}")

            sad_response = sign1(data, pin)

            if isinstance(sad_response, str):
                return sad_response.encode("utf-8")
            elif isinstance(sad_response, dict) and "signedHash" in sad_response:
                return sad_response["signedHash"].encode("utf-8")
            else:
                raise ValueError(f"Unexpected SAD response: {sad_response}")

        signer = create_signer(csc_signer, SIGNING_ALG, CERT_CHAIN, TIMESTAMP_URL)

        # --- Step 6: Sign image ---
        log_step("Step 6: πŸ–ŠοΈ Signing image")
        result = io.BytesIO()
        builder.sign(signer, "image/jpeg", io.BytesIO(img_bytes), result)

        # --- Step 7: Write signed output ---
        log_step("Step 7: πŸ’Ύ Writing signed output")
        with open(output_path, "wb") as f:
            f.write(result.getvalue())

        logging.info(f"βœ… Signed file successfully written to {output_path}")

    except Exception as e:
        logging.error("❌ Signing failed", exc_info=e)
        print(f"❌ Error: {e}")


if __name__ == "__main__":
    import sys
    if len(sys.argv) != 3:
        print("Usage: python main1.py <input_image> <output_image>")
    else:
        main(sys.argv[1], sys.argv[2])

dtmCSC.py

import base64
import hashlib
import requests
import logging
from cryptography import x509
from cryptography.x509.oid import NameOID
import time

# Configure logging
logging.basicConfig(level=logging.INFO)

CSC_BASE_URL = "https://clientauth.demo.one.digicert.com/documentmanager/csc/v1"
CLIENT_CERT = "cert.pem"
CLIENT_KEY = "key.pem"
CREDENTIAL_ID = "basic_np-14-08-2025-11-01-44-165"  # Replace with your real credential ID


def extract_author_from_cert(cert_b64: str) -> str | None:
    """
    Extracts Common Name (CN) from a base64 DER certificate string.
    """
    try:
        cert_der = base64.b64decode(cert_b64)
        cert = x509.load_der_x509_certificate(cert_der)
        cn_attr = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
        if cn_attr:
            return cn_attr[0].value
    except Exception as e:
        logging.error("❌ Failed to extract CN from certificate", exc_info=e)
    return None


def get_credentials_info(pin: str):
    """
    Fetches author details from the DigiCert CSC credentials info API.
    Returns a tuple: (raw_data, author_name, author_email).
    """
    try:
        print("\n" + "=" * 60 + "\n")
        logging.info("πŸ“‘ Requesting credentials info from CSC API...")
        time.sleep(1)

        payload = {"PIN": pin, "credentialID": CREDENTIAL_ID}
        url = f"{CSC_BASE_URL}/credentials/info"
        resp = requests.post(
            url,
            json=payload,
            cert=(CLIENT_CERT, CLIENT_KEY),
            headers={"Content-Type": "application/json"},
            timeout=10
        )
        resp.raise_for_status()
        data = resp.json()

        # Try extracting from certificate
        certs = data.get("cert", {}).get("certificates", [])
        author_name = None
        if certs:
            author_name = extract_author_from_cert(certs[0])

        # CSC usually doesn’t return email β†’ keep blank
        author_email = ""

        logging.info("βœ… Credentials info successfully retrieved")
        time.sleep(1)

        return data, author_name, author_email
    except Exception as e:
        logging.error("❌ Failed to fetch credentials info", exc_info=e)
        return {}, None, ""


def get_sad(hash_b64: str, pin: str) -> str:
    """
    Request a Signature Activation Data (SAD) token from CSC for signing a hash.
    """
    print("\n" + "=" * 60 + "\n")
    logging.info("πŸ“‘ Requesting SAD from CSC API...")
    time.sleep(1)

    url = f"{CSC_BASE_URL}/credentials/authorize"
    payload = {
        "credentialID": CREDENTIAL_ID,
        "PIN": pin,
        "hash": [hash_b64],
        "hashAlgo": "SHA256",
        "numSignatures": 1
    }
    resp = requests.post(
        url,
        json=payload,
        cert=(CLIENT_CERT, CLIENT_KEY),
        headers={"Content-Type": "application/json"},
        timeout=10
    )
    if resp.status_code != 200:
        logging.error(f"❌ Failed to get SAD: {resp.status_code} {resp.text}")
        raise Exception(f"SAD request failed: {resp.text}")
    data = resp.json()

    logging.info("βœ… SAD successfully received")
    time.sleep(1)

    return data["SAD"]


def sign1(to_sign: bytes, pin: str, **kwargs) -> str:
    """
    Sign the input data using CSC by requesting SAD for its SHA256 hash.
    Returns the SAD token as string.
    """
    print("\n" + "=" * 60 + "\n")
    logging.info("πŸ”‘ Preparing data for CSC signing...")
    time.sleep(1)

    digest = hashlib.sha256(to_sign).digest()
    hash_b64 = base64.b64encode(digest).decode("utf-8")
    logging.info(f"🧾 Hash sent to CSC (Base64): {hash_b64}")
    time.sleep(1)

    sad = get_sad(hash_b64, pin)

    logging.info("βœ… Data signed by CSC")
    time.sleep(1)

    return sad

requirements.txt

c2pa-python==0.6.1

cryptography

requests