Skip to main content

Reference files

main.py

This script takes a JPEG image as input and creates a C2PA content-authenticity manifest that includes the authorโ€™s details and a record of the actions performed. It then signs the manifest using a DigiCert Cloud Signature Consortium (CSC) service.

When you run the script, it asks for your PIN to unlock the remote signing key. After that, it reads your author information from the CSC credentials, builds the manifest, and uses the CSC signer to create a digital signature. The script then embeds that signature into the output image.

The final result is a new JPEG file with verifiable content-authenticity and provenance information. The cryptographic signature is tied to the image itself, making it easy for others to confirm that the file is genuine and hasnโ€™t been changed.

import logging
import json
import io
import base64
import time
import getpass
from c2pa import Builder, create_signer, SigningAlg
from dtmCSC import sign1, get_credentials_info  # CSC API

# --- Config ---
CERT_CHAIN_PATH = "chain.pem"
SIGNING_ALG = SigningAlg.PS256
TIMESTAMP_URL = "http://timestamp.digicert.com"

# Load certificate chain
with open(CERT_CHAIN_PATH, "rb") as f:
    CERT_CHAIN = f.read()


def log_step(msg: str):
    print("\n" + "=" * 60 + "\n")
    logging.info(f"{msg}")
    time.sleep(1)


def main(image_path: str, output_path: str):
    logging.basicConfig(level=logging.INFO, format="%(message)s")

    # --- Step 1: Ask for PIN (hidden input) ---
    log_step("Step 1: ๐Ÿ”’ Enter PIN")
    pin = getpass.getpass("Enter PIN (hidden): ").strip()
    if not pin:
        raise ValueError("PIN cannot be empty")

    # --- Step 2: Fetch author info from CSC credentials API ---
    log_step("Step 2: ๐Ÿ“ก Fetching author info from CSC")
    author_name, author_email = "", ""
    try:
        creds_info, author_from_cert, email_from_api = get_credentials_info(pin)

        if author_from_cert:
            author_name = author_from_cert.strip()
        if email_from_api:
            author_email = email_from_api.strip()

    except Exception as e:
        logging.error("โŒ Failed to get credentials info", exc_info=e)

    # Fallbacks
    if not author_name:
        author_name = input("Author name not found from CSC. Enter author name to use: ").strip()
    if not author_email:
        author_email = ""

    logging.info(f"โœ… Using author: {author_name} ({author_email})")

    # --- Step 3: Build manifest ---
    log_step("Step 3: ๐Ÿ—๏ธ Building manifest")
    manifest_dict = {
        "title": image_path,
        "format": "image/jpeg",
        "claim_generator_info": [
            {"name": "DigiCert Content Trust Manager", "version": "1.0.0"}
        ],
        "assertions": [
            {
                "label": "stds.schema-org.CreativeWork",
                "data": {
                    "@context": "https://schema.org",
                    "@type": "CreativeWork",
                    "author": [
                        {"@type": "Person", "name": author_name, "email": author_email}
                    ],
                },
                "kind": "Json",
            },
            {
                "label": "c2pa.actions",
                "data": {
                    "actions": [
                        {
                            "action": "c2pa.created",
                            "softwareAgent": {
                                "name": "DigiCert Content Trust Manager",
                                "version": "1.0.0",
                            },
                        },
                        {
                            "action": "c2pa.signed",
                            "softwareAgent": {
                                "name": "DigiCert Content Trust Manager",
                                "version": "1.0.0",
                            },
                        },
                    ]
                },
                "kind": "Json",
            },
        ],
    }
    manifest_json = json.dumps(manifest_dict)

    try:
        # --- Step 4: Load image ---
        log_step("Step 4: ๐Ÿ–ผ๏ธ Loading input image")
        with open(image_path, "rb") as f:
            img_bytes = f.read()

        builder = Builder(manifest_json)

        # --- Step 5: CSC signer callback ---
        log_step("Step 5: โœ๏ธ Preparing CSC signer")

        def csc_signer(data: bytes, **kwargs):
            hash_b64 = base64.b64encode(data).decode("utf-8")
            logging.info(f"๐Ÿงพ Hash prepared for CSC: {hash_b64}")

            sad_response = sign1(data, pin)

            if isinstance(sad_response, str):
                return sad_response.encode("utf-8")
            elif isinstance(sad_response, dict) and "signedHash" in sad_response:
                return sad_response["signedHash"].encode("utf-8")
            else:
                raise ValueError(f"Unexpected SAD response: {sad_response}")

        signer = create_signer(csc_signer, SIGNING_ALG, CERT_CHAIN, TIMESTAMP_URL)

        # --- Step 6: Sign image ---
        log_step("Step 6: ๐Ÿ–Š๏ธ Signing image")
        result = io.BytesIO()
        builder.sign(signer, "image/jpeg", io.BytesIO(img_bytes), result)

        # --- Step 7: Write signed output ---
        log_step("Step 7: ๐Ÿ’พ Writing signed output")
        with open(output_path, "wb") as f:
            f.write(result.getvalue())

        logging.info(f"โœ… Signed file successfully written to {output_path}")

    except Exception as e:
        logging.error("โŒ Signing failed", exc_info=e)
        print(f"โŒ Error: {e}")


if __name__ == "__main__":
    import sys
    if len(sys.argv) != 3:
        print("Usage: python main1.py <input_image> <output_image>")
    else:
        main(sys.argv[1], sys.argv[2])

dtmCSC.py

import base64
import hashlib
import requests
import logging
from cryptography import x509
from cryptography.x509.oid import NameOID
import time

# Configure logging
logging.basicConfig(level=logging.INFO)

CSC_BASE_URL = "https://clientauth.demo.one.digicert.com/documentmanager/csc/v1"
CLIENT_CERT = "cert.pem"
CLIENT_KEY = "key.pem"
CREDENTIAL_ID = "basic_np-14-08-2025-11-01-44-165"  # Replace with your real credential ID


def extract_author_from_cert(cert_b64: str) -> str | None:
    """
    Extracts Common Name (CN) from a base64 DER certificate string.
    """
    try:
        cert_der = base64.b64decode(cert_b64)
        cert = x509.load_der_x509_certificate(cert_der)
        cn_attr = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
        if cn_attr:
            return cn_attr[0].value
    except Exception as e:
        logging.error("โŒ Failed to extract CN from certificate", exc_info=e)
    return None


def get_credentials_info(pin: str):
    """
    Fetches author details from the DigiCert CSC credentials info API.
    Returns a tuple: (raw_data, author_name, author_email).
    """
    try:
        print("\n" + "=" * 60 + "\n")
        logging.info("๐Ÿ“ก Requesting credentials info from CSC API...")
        time.sleep(1)

        payload = {"PIN": pin, "credentialID": CREDENTIAL_ID}
        url = f"{CSC_BASE_URL}/credentials/info"
        resp = requests.post(
            url,
            json=payload,
            cert=(CLIENT_CERT, CLIENT_KEY),
            headers={"Content-Type": "application/json"},
            timeout=10
        )
        resp.raise_for_status()
        data = resp.json()

        # Try extracting from certificate
        certs = data.get("cert", {}).get("certificates", [])
        author_name = None
        if certs:
            author_name = extract_author_from_cert(certs[0])

        # CSC usually doesnโ€™t return email โ†’ keep blank
        author_email = ""

        logging.info("โœ… Credentials info successfully retrieved")
        time.sleep(1)

        return data, author_name, author_email
    except Exception as e:
        logging.error("โŒ Failed to fetch credentials info", exc_info=e)
        return {}, None, ""


def get_sad(hash_b64: str, pin: str) -> str:
    """
    Request a Signature Activation Data (SAD) token from CSC for signing a hash.
    """
    print("\n" + "=" * 60 + "\n")
    logging.info("๐Ÿ“ก Requesting SAD from CSC API...")
    time.sleep(1)

    url = f"{CSC_BASE_URL}/credentials/authorize"
    payload = {
        "credentialID": CREDENTIAL_ID,
        "PIN": pin,
        "hash": [hash_b64],
        "hashAlgo": "SHA256",
        "numSignatures": 1
    }
    resp = requests.post(
        url,
        json=payload,
        cert=(CLIENT_CERT, CLIENT_KEY),
        headers={"Content-Type": "application/json"},
        timeout=10
    )
    if resp.status_code != 200:
        logging.error(f"โŒ Failed to get SAD: {resp.status_code} {resp.text}")
        raise Exception(f"SAD request failed: {resp.text}")
    data = resp.json()

    logging.info("โœ… SAD successfully received")
    time.sleep(1)

    return data["SAD"]


def sign1(to_sign: bytes, pin: str, **kwargs) -> str:
    """
    Sign the input data using CSC by requesting SAD for its SHA256 hash.
    Returns the SAD token as string.
    """
    print("\n" + "=" * 60 + "\n")
    logging.info("๐Ÿ”‘ Preparing data for CSC signing...")
    time.sleep(1)

    digest = hashlib.sha256(to_sign).digest()
    hash_b64 = base64.b64encode(digest).decode("utf-8")
    logging.info(f"๐Ÿงพ Hash sent to CSC (Base64): {hash_b64}")
    time.sleep(1)

    sad = get_sad(hash_b64, pin)

    logging.info("โœ… Data signed by CSC")
    time.sleep(1)

    return sad

requirements.txt

c2pa-python==0.6.1

cryptography

requests