How Storm SQL is executed

I’m working on a project, to provide SQL interface for real time data processing on Beam. With some research, Storm SQL is mostly closed with my scenario, without new feature pending.

The main idea of Storm SQL is quite straight-forward, shown as image below:
1. SQL is parsed with Calcite, and generate an Execution Plan;
2. The execution plan is expressed with Trident Topology;
3. Submit the Trident Topology to Storm cluster;

Workflow of StormSQL: Alt

Next, let’s move to the code base of Storm, to see how it’s implemented.

Note that, Storm SQL is a new feature in version 2.0.0-SNASHOT, and not officially available yet. So first of all, you need to clone the code base from Github:Storm with master branch. You can only import Storm SQL part under storm/external/sql.

Step 1. the launcher

Refer to Wiki: Storm SQL Integration. The command to run Storm SQL is:

$ bin/storm sql <sql-file> <topo-name>

In storm.py, it’s actually launched as

exec_storm_class(
        "org.apache.storm.sql.StormSqlRunner",
        jvmtype="-client",
        extrajars=extrajars,
        args=args,
        daemon=False,
        jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])

org.apache.storm.sql.StormSqlRunner is the first place we start, depends on the input parameters, it could be
1. output the execution plan, which is function explain in StormSqlImpl.java;
2) submit the job, which is function submit in StormSqlImpl.java.

step 2. StormSql and StormParser

Before go to the details of Explain/Submit, I’d play more attention to classes StormSql and StormParser.

StormSql has an implementation class StormSqlImpl to coordinate the steps from SQL string to Explain outut, or a Trident Topology.

StormParser has an implementation class StormParserImpl for the physical parsing work. You may find that there’s not a StormParserImpl.java file. It’s generate by running mvn generate-sources.

It’s another BIG topic on how to generate the Parser, will put the details later in another post.

Step 3. explain a SQL

The top-level code for explain is included as below. It takes a list of SQL Statements, and print out the execution plan for each. In this part, it doesn’t touch much on Calcite-Storm integration, mostly it’s standard usage of Calcite parser/planner itself.

public void explain(Iterable<String> statements) throws Exception {
    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
    for (String sql : statements) {
        StormParser parser = new StormParser(sql);
        SqlNode node = parser.impl().parseSqlStmtEof();

        System.out.println("===========================================================");
        System.out.println("query>");
        System.out.println(sql);
        System.out.println("-----------------------------------------------------------");

        if (node instanceof SqlCreateTable) {
            handleCreateTableForTrident((SqlCreateTable) node, dataSources);
            System.out.println("No plan presented on DDL");
        } else if (node instanceof SqlCreateFunction) {
            handleCreateFunction((SqlCreateFunction) node);
            System.out.println("No plan presented on DDL");
        } else {
            FrameworkConfig config = buildFrameWorkConfig();
            Planner planner = Frameworks.getPlanner(config);
            SqlNode parse = planner.parse(sql);
            SqlNode validate = planner.validate(parse);
            RelNode tree = planner.convert(validate);

            String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
            System.out.println("plan>");
            System.out.println(plan);
        }

        System.out.println("===========================================================");
    }
}

a. L5, Detect statement type

SqlNode node = parser.impl().parseSqlStmtEof();

This line parses the SQL statement string, and generate a SqlNode to represent it’s syntax structure. Below image show all the types supported wuth Calcite.
Alt

b. L12-14, define datasources

if (node instanceof SqlCreateTable) {
    handleCreateTableForTrident((SqlCreateTable) node, dataSources);
    System.out.println("No plan presented on DDL");
}

If it’s a CREATE TABLE statement, a ISqlTridentDataSource is created based on create schema. ISqlTridentDataSource defines how to read data from the source, and how to output data, with Trident. See Calcite Adapter to understand how Calcite handles data binding.

c. L15-17, add UDF/UDAF

if (node instanceof SqlCreateFunction) {
    handleCreateFunction((SqlCreateFunction) node);
    System.out.println("No plan presented on DDL");
}

UDF and UDAF are supported, you could visit wiki:Specifying UDF and UDAF for more details.

d. L19-27, Execution planner

FrameworkConfig config = buildFrameWorkConfig();
Planner planner = Frameworks.getPlanner(config);
SqlNode parse = planner.parse(sql);
SqlNode validate = planner.validate(parse);
RelNode tree = planner.convert(validate);

String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
System.out.println("plan>");
System.out.println(plan);
  • L1, a StdFrameworkConfig instance is created as planning config;
  • L2, a instance of Calcite’s default Planner org.apache.calcite.prepare.PlannerImpl is created.
  • L3-5, Planner parse and validate the SQL statement, and convert into a Relode. RelNode is a tree of relational expressions. You can see Calcite: Algebra for more details of SQL in the back-end.
  • L7, StormRelUtils output the execution plan represent by RelNode, with a easy-to-read format.

Step 3. submit a SQL

It’s similar as EXPLAIN to handle SqlCreateFunction and SqlCreateTable, and the information is kept in SchemaPlus as source schema and UDF/UDAF.

1). create QueryPlanner with schema

FrameworkConfig config = Frameworks.newConfigBuilder()
        .defaultSchema(schema)
        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
        .traitDefs(traitDefs)
        .context(Contexts.EMPTY_CONTEXT)
        .ruleSets(TridentStormRuleSets.getRuleSets())
        .costFactory(null)
        .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
        .build();
this.planner = Frameworks.getPlanner(config);

a. ruleSets(TridentStormRuleSets.getRuleSets())
TridentStormRuleSets.getRuleSets() returns all the rules for planner. It contains.

b. StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM
Here Storm overwrite max Numeric to 38, from default 19.

2). compile SQL to Trident

The actual logic is defined as below, here I skip IAggregatableStream lastStream as it’s for interactive streaming.

TridentRel relNode = getPlan(query);

TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
relNode.tridentPlan(tridentPlanCreator);

final TridentTopology topology = tridentPlanCreator.getTopology();

a). L1, getPlan(query)

Back to EXPLAIN part, it ends with a standard RelNode, that’s why I call it not-related-with-Trident.

Function convertToStormRelconvert is applied to RelNode, to generate extended TridentRel expressions, by

planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);

TODO, now RelNode is translated to TridentRel, which has a new interface tridentPlan, a separated post is planned to describe the parser details.

In the line of PLannerImpl,

Program program = programs.get(ruleSetIndex);

It points to the rule set defined in QueryPlanner as

addAll(StreamRules.RULES).addAll(calciteToStormConversionRules)

b). L3-4, create Trident Plan

Let’s take TridentFilterRel as an example

RelNode input = getInput();
StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
Stream inputStream = planCreator.pop().toStream();

First of all, it create inputStream in Trident. For example in TridentStreamScanRel, it calls newStream(stageName, sources.get(sourceName).getProducer()). There’re several DataSourceProvider in StormSQL, like KafkaDataSourcesProvider which returns a OpaqueTridentKafkaSpout.

Next, a filter function is applied to the inputStream,

inputStream.filter(new EvaluationFilter(expression, planCreator.getDataContext())).name(stageName);

Inside of EvaluationFilter, there’s one ScriptEvaluator of Janino.

c). L6, get Trident Topology

After L3-4, the Trident Topology is already assembled, as TridentTopology topology in TridentPlanCreator.

3). package and submit to run

Back to submit in StormSqlImpl, the last step is submit the topology to run,

jarPath = Files.createTempFile("storm-sql", ".jar");
System.setProperty("storm.jar", jarPath.toString());
packageEmptyTopology(jarPath);
StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);

That’s end for how Storm SQL is implemented. There’re still several topics that are not expanded here:
1. The syntax and parser of Calcite Stream SQL;
2. extend rule to support customized operation in planner;
3. define new data source adapter;

2 comments

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s