Skip to content

Commit 4881243

Browse files
DEVEX-2578 dxCompiler Option for Parallel Compilation of Applets [by Fulcrum Genomics] (#510)
--------- Co-authored-by: Ted Brookings <[email protected]>
1 parent eba94bc commit 4881243

30 files changed

+896
-121
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## Unreleased
44

5+
* Added option `-executableCreationParallelism <int>`, the maximum number of platform executables that dxCompiler can
6+
create in parallel, defaults to 1.
7+
58
### Dependency updates
69

710
* dxApi [0.13.13](https://github.com/dnanexus/dxScala/releases/tag/api-0.13.13)

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ lazy val dependencies =
136136
val scalatestVersion = "3.2.9"
137137
val logbackVersion = "1.2.10"
138138
val mockitoVersion = "3.2.10.0"
139+
val parallelCollectionsVersion = "1.2.0"
139140

140141
val dxCommon = "com.dnanexus" % "dxcommon" % dxCommonVersion
141142
val dxApi = "com.dnanexus" % "dxapi" % dxApiVersion
@@ -148,6 +149,7 @@ lazy val dependencies =
148149
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
149150
val scalatest = "org.scalatest" % "scalatest_2.13" % scalatestVersion
150151
val mockito = "org.scalatestplus" %% "mockito-3-4" % mockitoVersion % "test"
152+
val parallelCollections = "org.scala-lang.modules" %% "scala-parallel-collections" % parallelCollectionsVersion
151153
}
152154

153155
lazy val commonDependencies = Seq(
@@ -157,7 +159,8 @@ lazy val commonDependencies = Seq(
157159
dependencies.logback,
158160
dependencies.spray,
159161
dependencies.scalatest % Test,
160-
dependencies.mockito
162+
dependencies.mockito,
163+
dependencies.parallelCollections
161164
)
162165

163166
// SETTINGS

compiler/src/main/scala/dx/compiler/Compiler.scala

Lines changed: 139 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import spray.json.{JsValue, _}
2626
import dx.util.{FileSourceResolver, FileUtils, JsUtils, Logger, TraceLevel}
2727

2828
import scala.jdk.CollectionConverters._
29+
import scala.collection.immutable.VectorBuilder
30+
import dx.parallel.ParallelDef.seqToParSupport
2931

3032
object Compiler {
3133
val RuntimeConfigFile = "dxCompiler_runtime.conf"
@@ -69,6 +71,7 @@ case class Compiler(extras: Option[Extras],
6971
instanceTypeSelection: InstanceTypeSelection.InstanceTypeSelection,
7072
defaultInstanceType: Option[String],
7173
fileResolver: FileSourceResolver = FileSourceResolver.get,
74+
executableCreationParallelism: Int,
7275
dxApi: DxApi = DxApi.get,
7376
logger: Logger = Logger.get) {
7477
// logger for extra trace info
@@ -160,6 +163,7 @@ case class Compiler(extras: Option[Extras],
160163

161164
// Add a checksum to a request
162165
private def checksumRequest(name: String,
166+
versionTag: String,
163167
desc: Map[String, JsValue]): (Map[String, JsValue], String) = {
164168
logger2.trace(
165169
s"""|${name} -> checksum request
@@ -193,7 +197,7 @@ case class Compiler(extras: Option[Extras],
193197
}
194198
val updatedDetails = existingDetails ++
195199
Map(
196-
Constants.Version -> JsString(getVersion),
200+
Constants.Version -> JsString(versionTag),
197201
Constants.Checksum -> JsString(digest)
198202
)
199203
// Add properties and attributes we don't want to fall under the checksum
@@ -317,6 +321,7 @@ case class Compiler(extras: Option[Extras],
317321
*/
318322
private def maybeBuildApplet(
319323
applet: Application,
324+
versionTag: String,
320325
dependencyDict: Map[String, CompiledExecutable]
321326
): (DxApplet, Vector[ExecutableLink]) = {
322327
logger2.trace(s"Compiling applet ${applet.name}")
@@ -354,6 +359,7 @@ case class Compiler(extras: Option[Extras],
354359
// Calculate a checksum of the inputs that went into the making of the applet.
355360
val (appletApiRequest, digest) = checksumRequest(
356361
applet.name,
362+
versionTag,
357363
appletCompiler.apply(applet, dependencies)
358364
)
359365
// write the request to a file, in case we need it for debugging
@@ -389,6 +395,7 @@ case class Compiler(extras: Option[Extras],
389395
*/
390396
private def maybeBuildWorkflow(
391397
workflow: Workflow,
398+
versionTag: String,
392399
dependencyDict: Map[String, CompiledExecutable]
393400
): (DxWorkflow, JsValue) = {
394401
logger2.trace(s"Compiling workflow ${workflow.name}")
@@ -405,7 +412,8 @@ case class Compiler(extras: Option[Extras],
405412
logger2)
406413
// Calculate a checksum of the inputs that went into the making of the applet.
407414
val (workflowApiRequest, execTree) = workflowCompiler.apply(workflow, dependencyDict)
408-
val (requestWithChecksum, digest) = checksumRequest(workflow.name, workflowApiRequest)
415+
val (requestWithChecksum, digest) =
416+
checksumRequest(workflow.name, versionTag, workflowApiRequest)
409417
// Add properties we do not want to fall under the checksum.
410418
// This allows, for example, moving the dx:executable, while
411419
// still being able to reuse it.
@@ -434,53 +442,143 @@ case class Compiler(extras: Option[Extras],
434442
(dxWf, execTree)
435443
}
436444

445+
/**
446+
* Compile a single executable,
447+
* @param name the Callable name to build
448+
* @param dependencyDict the dependencies needed for this executable
449+
* @return
450+
*/
451+
private def buildExecutable(
452+
name: String,
453+
versionTag: String,
454+
dependencyDict: Map[String, CompiledExecutable]
455+
): (String, CompiledExecutable) = {
456+
bundle.allCallables(name) match {
457+
case application: Application =>
458+
val execRecord = application.kind match {
459+
case _: ExecutableKindNative if useManifests =>
460+
throw new Exception("cannot use manifest files with native app(let)s")
461+
case ExecutableKindNative(ExecutableType.App | ExecutableType.Applet,
462+
Some(id),
463+
_,
464+
_,
465+
_) =>
466+
// native app(let)s do not depend on other data-objects
467+
CompiledExecutable(application, dxApi.executable(id))
468+
case ExecutableKindNative(ExecutableType.Applet, _, _, project, Some(path)) =>
469+
val applet = dxApi.resolveDataObject(path, project.map(dxApi.project)) match {
470+
case applet: DxApplet => applet
471+
case _ =>
472+
throw new Exception(
473+
s"${path} in ${project.getOrElse("current project")} is not an applet"
474+
)
475+
}
476+
CompiledExecutable(application, applet)
477+
case ExecutableKindNative(ExecutableType.App, _, Some(name), _, _) =>
478+
CompiledExecutable(application, dxApi.resolveApp(name))
479+
case ExecutableKindWorkflowCustomReorg(id) =>
480+
// for now, we assume the user has built their reorg applet to handle manifest
481+
// input if useManifests = true
482+
CompiledExecutable(application, dxApi.executable(id))
483+
case _ =>
484+
val (dxApplet, dependencies) =
485+
try {
486+
maybeBuildApplet(application, versionTag, dependencyDict)
487+
} catch {
488+
case t: Throwable =>
489+
throw new RuntimeException(
490+
"Building applet '" + application.name + "': " + t.toString
491+
)
492+
}
493+
494+
CompiledExecutable(application, dxApplet, dependencies)
495+
}
496+
application.name -> execRecord
497+
case wf: Workflow =>
498+
val (dxWorkflow, execTree) =
499+
try {
500+
maybeBuildWorkflow(wf, versionTag, dependencyDict)
501+
} catch {
502+
case t: Throwable =>
503+
throw new RuntimeException("Building workflow '" + wf.name + "': " + t.toString)
504+
}
505+
wf.name -> CompiledExecutable(wf, dxWorkflow, execTree = Some(execTree))
506+
}
507+
}
508+
509+
private def getCompileOrder: Vector[Vector[String]] = {
510+
val callableNames = bundle.allCallables.keySet
511+
val deps: Map[String, Set[String]] = bundle.allCallables.values.map { callable: Callable =>
512+
val callableDeps = callable match {
513+
case application: Application =>
514+
application.kind match {
515+
case ExecutableKindWfFragment(call, _, _, _) =>
516+
call.toList.toSet.intersect(callableNames)
517+
case _ => Set.empty[String]
518+
}
519+
case workflow: Workflow =>
520+
workflow.stages.map(_.calleeName).toSet.intersect(callableNames)
521+
}
522+
(callable.name, callableDeps)
523+
}.toMap
524+
525+
val subBlocks = new VectorBuilder[Vector[String]]
526+
var allSatisfied = Set.empty[String]
527+
var remainingNames = bundle.dependencies
528+
logger.trace("Finding blocks of parallelizable callables to build")
529+
while (remainingNames.nonEmpty) {
530+
val (satisfied, unsatisfied) = remainingNames.partition(c => deps(c).subsetOf(allSatisfied))
531+
if (satisfied.isEmpty) {
532+
throw new RuntimeException(
533+
f"Unable to satisfy all dependencies of ${unsatisfied}:\ndeps=${deps}"
534+
)
535+
}
536+
logger.trace(
537+
s"\tblock ${subBlocks.size} callables: $satisfied"
538+
)
539+
subBlocks += satisfied
540+
allSatisfied |= satisfied.toSet
541+
remainingNames = unsatisfied
542+
}
543+
logger.trace("Done finding blocks of parallelizable callables")
544+
subBlocks.result()
545+
}
546+
437547
def apply: CompilerResults = {
438548
logger.trace(
439549
s"Generate dx:applets and dx:workflows for ${bundle} in ${project.id}${folder}"
440550
)
441-
val executables = bundle.dependencies.foldLeft(Map.empty[String, CompiledExecutable]) {
442-
case (accu, name) =>
443-
bundle.allCallables(name) match {
444-
case application: Application =>
445-
val execRecord = application.kind match {
446-
case _: ExecutableKindNative if useManifests =>
447-
throw new Exception("cannot use manifest files with native app(let)s")
448-
case ExecutableKindNative(ExecutableType.App | ExecutableType.Applet,
449-
Some(id),
450-
_,
451-
_,
452-
_) =>
453-
// native app(let)s do not depend on other data-objects
454-
CompiledExecutable(application, dxApi.executable(id))
455-
case ExecutableKindNative(ExecutableType.Applet, _, _, project, Some(path)) =>
456-
val applet = dxApi.resolveDataObject(path, project.map(dxApi.project)) match {
457-
case applet: DxApplet => applet
458-
case _ =>
459-
throw new Exception(
460-
s"${path} in ${project.getOrElse("current project")} is not an applet"
461-
)
462-
}
463-
CompiledExecutable(application, applet)
464-
case ExecutableKindNative(ExecutableType.App, _, Some(name), _, _) =>
465-
CompiledExecutable(application, dxApi.resolveApp(name))
466-
case ExecutableKindWorkflowCustomReorg(id) =>
467-
// for now, we assume the user has built their reorg applet to handle manifest
468-
// input if useManifests = true
469-
CompiledExecutable(application, dxApi.executable(id))
470-
case _ =>
471-
val (dxApplet, dependencies) = maybeBuildApplet(application, accu)
472-
CompiledExecutable(application, dxApplet, dependencies)
551+
logger.trace(
552+
s""
553+
)
554+
val versionTag: String = getVersion
555+
var stage: Int = 0
556+
val compileOrder = getCompileOrder
557+
val executables: Map[String, CompiledExecutable] =
558+
compileOrder.foldLeft(Map.empty[String, CompiledExecutable]) {
559+
// compile each block of mutually-independent callables, and concatenate into the map
560+
// all executables from previous blocks (possible dependencies) will be stored in "executables"
561+
case (executables: Map[String, CompiledExecutable],
562+
blockExecutableNames: Vector[String]) =>
563+
logger.info(
564+
s"Parallel compile stage $stage with ${executables.size} old executables and ${blockExecutableNames.size} new executables"
565+
)
566+
val blockExecutables = blockExecutableNames
567+
.parWith(parallelism = executableCreationParallelism)
568+
.map { name =>
569+
buildExecutable(name, versionTag, executables)
473570
}
474-
accu + (application.name -> execRecord)
475-
case wf: Workflow =>
476-
val (dxWorkflow, execTree) = maybeBuildWorkflow(wf, accu)
477-
accu + (wf.name -> CompiledExecutable(wf, dxWorkflow, execTree = Some(execTree)))
478-
}
479-
}
571+
.toMap
572+
.seq
573+
stage += 1
574+
// accumulate the executables from this block
575+
executables ++ blockExecutables
576+
}
577+
480578
val primary: Option[CompiledExecutable] = bundle.primaryCallable.flatMap { c =>
481579
executables.get(c.name)
482580
}
483-
CompilerResults(primary, executables)
581+
CompilerResults(primary, executables, compileOrder.flatten)
484582
}
485583
}
486584

compiler/src/main/scala/dx/compiler/ExecutableTree.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ case class CompiledExecutable(callable: Callable,
1111
execTree: Option[JsValue] = None)
1212

1313
case class CompilerResults(primary: Option[CompiledExecutable],
14-
executables: Map[String, CompiledExecutable]) {
14+
executables: Map[String, CompiledExecutable],
15+
executableOrder: Vector[String]) {
1516
def executableIds: Vector[String] = {
1617
primary match {
17-
case None => executables.values.map(_.dxExec.id).toVector
18+
case None => (executableOrder collect executables).map(_.dxExec.id)
1819
case Some(obj) => Vector(obj.dxExec.id)
1920
}
2021
}

compiler/src/main/scala/dx/compiler/WorkflowCompiler.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,15 @@ case class WorkflowCompiler(separateOutputs: Boolean,
460460
// build the "stages" part of the API request
461461
val stages =
462462
workflow.stages.map { stage =>
463-
val CompiledExecutable(irExecutable, dxExec, _, _) = executableDict(stage.calleeName)
463+
val CompiledExecutable(irExecutable, dxExec, _, _) =
464+
try {
465+
executableDict(stage.calleeName)
466+
} catch {
467+
case e: java.util.NoSuchElementException =>
468+
throw new java.util.NoSuchElementException(
469+
e.toString + "\nHave keys: " + executableDict.keys.mkString(",")
470+
)
471+
}
464472
val linkedInputs = if (useManifests) {
465473
// when using manifests, we have to create an input array of all the
466474
// manifests output by any linked stages, and a hash of links between
@@ -599,6 +607,7 @@ case class WorkflowCompiler(separateOutputs: Boolean,
599607
).flatten.toMap
600608
)
601609
}
610+
602611
// build the details JSON
603612
val defaultTags = Set(Constants.CompilerTag)
604613
// compress and base64 encode the source code

0 commit comments

Comments
 (0)