Ändringsflöde i Azure Cosmos DB API för Cassandra
GÄLLER för:
API för Cassandra
Stöd för ändringsflöde i Azure Cosmos DB API för Cassandra är tillgängligt via fråge predikaten i Cassandra Query Language (CQL). Med hjälp av de här predikatvillkoren kan du köra frågor mot API:et för ändringsflödet. Program kan hämta de ändringar som gjorts i en tabell med hjälp av den primära nyckeln (kallas även partitionsnyckeln) som krävs i CQL. Du kan sedan vidta ytterligare åtgärder baserat på resultatet. Ändringar av raderna i tabellen avbildas i den ordning de ändras och sorteringsordningen per partitionsnyckel.
I följande exempel visas hur du hämtar en ändringsfeed på alla rader i en API för Cassandra keyspace-tabell med hjälp av .NET. Predikatet COSMOS_CHANGEFEED_START_TIME() används direkt i CQL för att fråga objekt i ändringsflödet från en angiven starttid (i det här fallet aktuell datetime). Du kan ladda ned det fullständiga exemplet för C# här och för Java här.
I varje iteration återupptas frågan vid den senaste tidpunkten då ändringar lästes, med hjälp av växlingstillstånd. Vi kan se en kontinuerlig ström av nya ändringar i tabellen i nyckelutrymmet. Vi kommer att se ändringar i rader som infogas eller uppdateras. Det finns för närvarande inte stöd för borttagningsåtgärder API för Cassandra ändringsfeed i en API för Cassandra.
Session cassandraSession = utils.getSession();
try {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);
String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
byte[] token=null;
System.out.println(query);
while(true)
{
SimpleStatement st=new SimpleStatement(query);
st.setFetchSize(100);
if(token!=null)
st.setPagingStateUnsafe(token);
ResultSet result=cassandraSession.execute(st) ;
token=result.getExecutionInfo().getPagingState().toBytes();
for(Row row:result)
{
System.out.println(row.getString("user_name"));
}
}
} finally {
utils.close();
LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
}
Du kan lägga till den primära nyckeln i frågan för att få ändringarna till en enskild rad efter primärnyckel. I följande exempel visas hur du spårar ändringar för raden där "user_id = 1"
String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
SimpleStatement st=new SimpleStatement(query);
Aktuella begränsningar
Följande begränsningar gäller när du använder ändringsflöde med API för Cassandra:
- Infogningar och uppdateringar stöds för närvarande. Borttagningsåtgärden stöds inte ännu. Som en tillfällig lösning kan du lägga till en mjuk markör på rader som tas bort. Du kan till exempel lägga till ett fält på raden med namnet "deleted" och ange det till "true".
- Den senaste uppdateringen bevaras som i core SQL API och mellanliggande uppdateringar av entiteten är inte tillgängliga.
Felhantering
Följande felkoder och meddelanden stöds när du använder ändringsflödet i API för Cassandra:
- HTTP-felkod 429 – När ändringsflödet är frekvensbegränsat returneras en tom sida.