diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py
index 584e20a9ce29e85ddd2d01716b3579c90996cdf8..684e28bbcb18cc45c8d9645ca7db31ba2c1cf9fc 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py
@@ -391,3 +391,49 @@ def retValToListOfDict(retVal) -> list[dict]:
     """Convert a Records-style bookkeeping response to a list of dictionaries"""
     retVal = returnValueOrRaise(retVal)
     return [dict(zip(retVal["ParameterNames"], record)) for record in retVal["Records"]]
+def runs_to_input_query(runs):
+    """Convert a list of runs to a query for the bookkeeping system
+    The list of runs can contain any combination of:
+    - single run numbers as a string or integer
+    - run ranges as a string in the form "start:end" (inclusive)
+    """
+    for run in runs:
+        if isinstance(run, int):
+            run = str(run)
+        if ":" in run:
+            start, end = map(int, run.split(":"))
+            yield from map(str, range(start, end + 1))
+        else:
+            yield run
+def configure_input(pr, data, *, runs=None, startRun=None, endRun=None):
+    """Configure the input for a ProductionRequest object."""
+    scd = json.loads(data["SimCondDetail"])
+    import ast
+    if "[" in scd["inDataQualityFlag"]:
+        scd["inDataQualityFlag"] = ",".join(ast.literal_eval(scd["inDataQualityFlag"]))
+    pr.startRun = startRun or 0
+    pr.endRun = endRun or 0
+    if runs:
+        pr.runsList = ",".join(runs_to_input_query(runs))
+    pr.configName = scd["configName"]
+    pr.configVersion = scd["configVersion"]
+    pr.dqFlag = scd["inDataQualityFlag"]
+    if scd["inSMOG2State"]:
+        pr.smog2State = scd["inSMOG2State"]
+    pr.dataTakingConditions = data["SimCondition"]
+    pr.processingPass = scd["inProPass"]
+    pr.eventType = data["EventType"]
+    if scd["inProductionID"] == "ALL":
+        pr.bkFileType = scd["inFileType"]
+        pr.bkQueries = ["Full"]
+    else:
+        pr.previousProdID = scd["inProductionID"]
+        pr.bkQueries = ["fromPreviousProd"]
diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py
index f4a738333999eb2544d7def300041846f5e11419..7cf10935761f6c83303459772cfc6b13ea01b5ce 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py
@@ -15,7 +15,10 @@ from pathlib import Path
 from DIRAC import S_OK
 from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import parse_obj
-from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import step_to_step_manager_dict
+from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import (
+    step_to_step_manager_dict,
+    runs_to_input_query,
 EXAMPLE_YAML_FILES = list((Path(__file__).parent / "example_yamls").glob("*.yaml"))
@@ -81,3 +84,19 @@ def test_step_to_step_manager_dict(yaml_path, monkeypatch):
                     {"FileType": "B2OC.DST", "Visible": "N"},
                     {"FileType": "BNOC.DST", "Visible": "N"},
+    "runs, expected",
+    [
+        ([], ""),
+        ([1], "1"),
+        (["34"], "34"),
+        (["34", "35"], "34,35"),
+        (["34:36"], "34,35,36"),
+        (["34:36", "38"], "34,35,36,38"),
+        (["34:36", "38:40"], "34,35,36,38,39,40"),
+    ],
+def test_runs_to_input_query(runs, expected):
+    assert ",".join(runs_to_input_query(runs)) == expected
diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
index 13a373a0f2c172b591c20a2909b86336d5e2c383..c11e47389b5d52dda64002413a86d09ec6d0c718 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
@@ -13,6 +13,7 @@ from __future__ import annotations
 import json
 import os
+import random
 import shlex
 import subprocess
 import sys
@@ -20,7 +21,6 @@ import tempfile
 from collections import defaultdict
 from pathlib import Path
 from textwrap import dedent
-from typing import Optional
 import yaml
@@ -28,6 +28,8 @@ from DIRAC import gLogger
 from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
 from DIRAC.Core.Base.Script import Script
 from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise, convertToReturnValue
 from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import (
@@ -42,6 +44,8 @@ def parseArgs():
     inputFiles = None
     inputFileType = None
     ancestorDepth = None
+    numTestLFNs = 1
+    testRunNumbers = None
     def disableCfgOverride(_):
@@ -63,11 +67,23 @@ def parseArgs():
         nonlocal ancestorDepth
         ancestorDepth = int(s)
+    @convertToReturnValue
+    def setNumTestLFNs(s: str):
+        nonlocal numTestLFNs
+        numTestLFNs = int(s)
+    @convertToReturnValue
+    def setTestRunNumbers(s: str):
+        nonlocal testRunNumbers
+        testRunNumbers = s.split(",")
     switches = [
         ("", "input-files=", "Comma separated list of input files (Data only)", setInputFiles),
         ("", "no-cfg-override", "Internal implementation detail", disableCfgOverride),
         ("", "input-file-type=", "Limit the file type for generic merge steps", setOutputFileType),
         ("", "ancestor-depth=", "Set the ancestor depth that should be included in the pool catalog", setAncestorDepth),
+        ("", "num-test-lfns=", "Number of LFNs to test with", setNumTestLFNs),
+        ("", "test-runs=", "Comma separated list of test runs to use", setTestRunNumbers),
     Script.registerArgument("yaml_path: Path to the YAML file containing productions to submit")
@@ -82,7 +98,17 @@ def parseArgs():
         gLogger.fatal("Failed to contact CS, do you have a valid proxy?")
-    return Path(yaml_path), name, eventType, inputFiles, useCfgOverride, inputFileType, ancestorDepth
+    return (
+        Path(yaml_path),
+        name,
+        eventType,
+        inputFiles,
+        useCfgOverride,
+        inputFileType,
+        ancestorDepth,
+        numTestLFNs,
+        testRunNumbers,
+    )
 def _runWithConfigOverride(argv):
@@ -122,7 +148,9 @@ def _runWithConfigOverride(argv):
 def main():
-    yamlPath, name, eventType, inputFiles, useCfgOverride, inputFileType, ancestorDepth = parseArgs()
+    yamlPath, name, eventType, inputFiles, useCfgOverride, inputFileType, ancestorDepth, numTestLFNs, testRunNumbers = (
+        parseArgs()
+    )
     if useCfgOverride:
         return _runWithConfigOverride(sys.argv + ["--no-cfg-override"])
@@ -179,6 +207,8 @@ def main():
+        numTestLFNs=numTestLFNs,
+        testRunNumbers=testRunNumbers,
@@ -190,8 +220,15 @@ def testProductionRequest(
     inputFiles: list[str] | None = None,
     inputFileType: str | None = None,
+    numTestLFNs: int = 1,
+    testRunNumbers: list[str] | None = None,
-    from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import production_to_legacy_dict
+    from DIRAC.DataManagementSystem.Client.DataManager import DataManager
+    from LHCbDIRAC.BookkeepingSystem.Client.BookkeepingClient import BookkeepingClient
+    from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import (
+        production_to_legacy_dict,
+        configure_input,
+    )
     from LHCbDIRAC.ProductionManagementSystem.Client.ProductionRequest import ProductionRequest
     pr = ProductionRequest()
@@ -211,7 +248,37 @@ def testProductionRequest(
     elif isinstance(productionRequest, DataProduction):
         if not inputFiles:
-            raise ValueError("No input files specified")
+            configure_input(pr, legacy_dict, runs=testRunNumbers)
+            bkQueryDict = pr._getBKKQuery()
+            result = returnValueOrRaise(BookkeepingClient().getFilesWithMetadata(bkQueryDict))
+            if result["TotalRecords"] == 0:
+                raise ValueError("No input files found in the bookkeeping")
+            # Remove the smallest 50% of files to avoid unusually small files
+            sizeIndex = result["ParameterNames"].index("FileSize")
+            records = sorted(result["Records"], key=lambda x: x[sizeIndex])
+            if len(records) // 2 >= numTestLFNs:
+                records = records[len(records) // 2 :]
+            # Shuffle the LFNs so we pick a random one
+            random.shuffle(records)
+            # Only run tests with files which have available replicas
+            filenameIndex = result["ParameterNames"].index("FileName")
+            inputFiles = []
+            for record in records:
+                lfn = record[filenameIndex]
+                result = returnValueOrRaise(DataManager().getReplicasForJobs([lfn]))
+                inputFiles.extend(result["Successful"])
+                if len(inputFiles) == numTestLFNs:
+                    break
+                if result["Failed"]:
+                    gLogger.warn("Skipping LFN as it has no replicas for jobs", result["Failed"])
+            else:
+                raise ValueError(f"Only {len(inputFiles)} input files found, requested {numTestLFNs}")
+        if len(inputFiles) < numTestLFNs:
+            raise ValueError(f"Only {len(inputFiles)} input files found, requested {numTestLFNs}")
         kwargs |= dict(