Reference files
main.py
import logging
import json
import io
import base64
import time
import getpass
from c2pa import Builder, create_signer, SigningAlg
from dtmCSC import sign1, get_credentials_info # CSC API
# --- Config ---
CERT_CHAIN_PATH = "chain.pem"
SIGNING_ALG = SigningAlg.PS256
TIMESTAMP_URL = "http://timestamp.digicert.com"
# Load certificate chain
with open(CERT_CHAIN_PATH, "rb") as f:
CERT_CHAIN = f.read()
def log_step(msg: str):
print("\n" + "=" * 60 + "\n")
logging.info(f"{msg}")
time.sleep(1)
def main(image_path: str, output_path: str):
logging.basicConfig(level=logging.INFO, format="%(message)s")
# --- Step 1: Ask for PIN (hidden input) ---
log_step("Step 1: 🔒 Enter PIN")
pin = getpass.getpass("Enter PIN (hidden): ").strip()
if not pin:
raise ValueError("PIN cannot be empty")
# --- Step 2: Fetch author info from CSC credentials API ---
log_step("Step 2: 📡 Fetching author info from CSC")
author_name, author_email = "", ""
try:
creds_info, author_from_cert, email_from_api = get_credentials_info(pin)
if author_from_cert:
author_name = author_from_cert.strip()
if email_from_api:
author_email = email_from_api.strip()
except Exception as e:
logging.error("❌ Failed to get credentials info", exc_info=e)
# Fallbacks
if not author_name:
author_name = input("Author name not found from CSC. Enter author name to use: ").strip()
if not author_email:
author_email = ""
logging.info(f"✅ Using author: {author_name} ({author_email})")
# --- Step 3: Build manifest ---
log_step("Step 3: 🏗️ Building manifest")
manifest_dict = {
"title": image_path,
"format": "image/jpeg",
"claim_generator_info": [
{"name": "DigiCert Content Trust Manager", "version": "1.0.0"}
],
"assertions": [
{
"label": "stds.schema-org.CreativeWork",
"data": {
"@context": "https://schema.org",
"@type": "CreativeWork",
"author": [
{"@type": "Person", "name": author_name, "email": author_email}
],
},
"kind": "Json",
},
{
"label": "c2pa.actions",
"data": {
"actions": [
{
"action": "c2pa.created",
"softwareAgent": {
"name": "DigiCert Content Trust Manager",
"version": "1.0.0",
},
},
{
"action": "c2pa.signed",
"softwareAgent": {
"name": "DigiCert Content Trust Manager",
"version": "1.0.0",
},
},
]
},
"kind": "Json",
},
],
}
manifest_json = json.dumps(manifest_dict)
try:
# --- Step 4: Load image ---
log_step("Step 4: 🖼️ Loading input image")
with open(image_path, "rb") as f:
img_bytes = f.read()
builder = Builder(manifest_json)
# --- Step 5: CSC signer callback ---
log_step("Step 5: ✍️ Preparing CSC signer")
def csc_signer(data: bytes, **kwargs):
hash_b64 = base64.b64encode(data).decode("utf-8")
logging.info(f"🧾 Hash prepared for CSC: {hash_b64}")
sad_response = sign1(data, pin)
if isinstance(sad_response, str):
return sad_response.encode("utf-8")
elif isinstance(sad_response, dict) and "signedHash" in sad_response:
return sad_response["signedHash"].encode("utf-8")
else:
raise ValueError(f"Unexpected SAD response: {sad_response}")
signer = create_signer(csc_signer, SIGNING_ALG, CERT_CHAIN, TIMESTAMP_URL)
# --- Step 6: Sign image ---
log_step("Step 6: 🖊️ Signing image")
result = io.BytesIO()
builder.sign(signer, "image/jpeg", io.BytesIO(img_bytes), result)
# --- Step 7: Write signed output ---
log_step("Step 7: 💾 Writing signed output")
with open(output_path, "wb") as f:
f.write(result.getvalue())
logging.info(f"✅ Signed file successfully written to {output_path}")
except Exception as e:
logging.error("❌ Signing failed", exc_info=e)
print(f"❌ Error: {e}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Usage: python main1.py <input_image> <output_image>")
else:
main(sys.argv[1], sys.argv[2])dtmCSC.py
import base64
import hashlib
import requests
import logging
from cryptography import x509
from cryptography.x509.oid import NameOID
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
CSC_BASE_URL = "https://clientauth.demo.one.digicert.com/documentmanager/csc/v1"
CLIENT_CERT = "cert.pem"
CLIENT_KEY = "key.pem"
CREDENTIAL_ID = "basic_np-14-08-2025-11-01-44-165" # Replace with your real credential ID
def extract_author_from_cert(cert_b64: str) -> str | None:
"""
Extracts Common Name (CN) from a base64 DER certificate string.
"""
try:
cert_der = base64.b64decode(cert_b64)
cert = x509.load_der_x509_certificate(cert_der)
cn_attr = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if cn_attr:
return cn_attr[0].value
except Exception as e:
logging.error("❌ Failed to extract CN from certificate", exc_info=e)
return None
def get_credentials_info(pin: str):
"""
Fetches author details from the DigiCert CSC credentials info API.
Returns a tuple: (raw_data, author_name, author_email).
"""
try:
print("\n" + "=" * 60 + "\n")
logging.info("📡 Requesting credentials info from CSC API...")
time.sleep(1)
payload = {"PIN": pin, "credentialID": CREDENTIAL_ID}
url = f"{CSC_BASE_URL}/credentials/info"
resp = requests.post(
url,
json=payload,
cert=(CLIENT_CERT, CLIENT_KEY),
headers={"Content-Type": "application/json"},
timeout=10
)
resp.raise_for_status()
data = resp.json()
# Try extracting from certificate
certs = data.get("cert", {}).get("certificates", [])
author_name = None
if certs:
author_name = extract_author_from_cert(certs[0])
# CSC usually doesn’t return email → keep blank
author_email = ""
logging.info("✅ Credentials info successfully retrieved")
time.sleep(1)
return data, author_name, author_email
except Exception as e:
logging.error("❌ Failed to fetch credentials info", exc_info=e)
return {}, None, ""
def get_sad(hash_b64: str, pin: str) -> str:
"""
Request a Signature Activation Data (SAD) token from CSC for signing a hash.
"""
print("\n" + "=" * 60 + "\n")
logging.info("📡 Requesting SAD from CSC API...")
time.sleep(1)
url = f"{CSC_BASE_URL}/credentials/authorize"
payload = {
"credentialID": CREDENTIAL_ID,
"PIN": pin,
"hash": [hash_b64],
"hashAlgo": "SHA256",
"numSignatures": 1
}
resp = requests.post(
url,
json=payload,
cert=(CLIENT_CERT, CLIENT_KEY),
headers={"Content-Type": "application/json"},
timeout=10
)
if resp.status_code != 200:
logging.error(f"❌ Failed to get SAD: {resp.status_code} {resp.text}")
raise Exception(f"SAD request failed: {resp.text}")
data = resp.json()
logging.info("✅ SAD successfully received")
time.sleep(1)
return data["SAD"]
def sign1(to_sign: bytes, pin: str, **kwargs) -> str:
"""
Sign the input data using CSC by requesting SAD for its SHA256 hash.
Returns the SAD token as string.
"""
print("\n" + "=" * 60 + "\n")
logging.info("🔑 Preparing data for CSC signing...")
time.sleep(1)
digest = hashlib.sha256(to_sign).digest()
hash_b64 = base64.b64encode(digest).decode("utf-8")
logging.info(f"🧾 Hash sent to CSC (Base64): {hash_b64}")
time.sleep(1)
sad = get_sad(hash_b64, pin)
logging.info("✅ Data signed by CSC")
time.sleep(1)
return sad
requirements.txt
c2pa-python==0.6.1
cryptography
requests