问题描述:
使用Apache Kafka Cassandra Connector时,如果在查询过程中使用了使用了CassandraConnectorConfig.CassandraInputConverterClass配置项来设置自定义的输入转换器,则可能会导致查询过程中创建过多的查询请求,从而导致Cassandra集群过载,响应变慢。
解决方案:
为了解决这个问题,可以使用尽可能少的查询来获取尽可能多的数据。您可以使用单个查询语句来获取尽可能多的数据,而不是在多个查询中使用分片。以下是使用Java API获取尽可能多数据的示例。这将使用SELECT *从Cassandra表中检索所有数据行:
CassandraSourceSetting setting = CassandraSourceSetting
.builder()
.withNode(node)
.withKeyspace(keyspace)
.withTable(table)
.withCassandraConfigBuilder(configBuilder)
.build();
String query = String.format("SELECT * FROM %s.%s", keyspace, table);
DataStream stream = env.addSource(
new CassandraSource<>(
setting,
new RowCassandraInputFormat(query),
TypeInformation.of(Row.class)
)
);
参考:
https://docs.lenses.io/connectors/source/cassandra.html