question

SandroFalter-5926 avatar image
0 Votes"
SandroFalter-5926 asked PRADEEPCHEEKATLA-MSFT answered

Synapse Spark: Reading Data from Cosmos DB -Resolving Azure CosmosDB LinkedService in Azure Synapse failed. Access token couldn't be obtained

Problem:
I tried to query data from the Cosmos DB Analytical Store, which is linked to my synapse workspace by using a Spark Notebook.

Configuration:
Cosmos DB:
- Azure Synapse Service Principal has Contributor role
- My Azure User Account has Contributor role

Azure Synapse Analytics
- My User Account has Owner Role

Azure Synapse Workspace
- My User Account has Synapse Administrator
- My User Account has Synapse Compute Operator Role


Spark:
- Spark 2.4
- Python 3.6
- Scala 2.11.12
- Java: 1.8.0_272
- Delta Lake 0.6

Remarks:
I can query the analytical store with a query from the serverless SQL pool. Only the spark connections fails with this error.

Code:

 dfStream = spark.readStream\
     .format("cosmos.oltp")\
     .option("spark.synapse.linkedService", "CosmosDb_Test")\
     .option("spark.cosmos.container", "TestContainer")\
     .option("spark.cosmos.changeFeed.readEnabled", "true")\
     .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
     .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
     .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
     .load()

Error:

 Py4JJavaError: An error occurred while calling o208.load.
     : java.lang.RuntimeException: Resolving Azure CosmosDB LinkedService [CosmosDb_Test] in Azure Synapse failed.Validate the configured LinkedService. If still seeing this, try using the Azure CosmosDB account name and credentials directly.
         at com.microsoft.azure.cosmos.analytics.spark.connector.common.ArcadiaLinkService$.fetchCosmosAccountInfo(ArcadiaLinkService.scala:112)
         at com.microsoft.azure.cosmos.analytics.spark.connector.common.ArcadiaLinkService.fetchCosmosAccountInfo(ArcadiaLinkService.scala:138)
         at com.microsoft.azure.cosmos.oltp.spark.ConnectionResolver.resolveAccountInfoThroughLinkedService(ConnectionResolver.scala:31)
         at com.microsoft.azure.cosmos.oltp.spark.CosmosOLTPSource.transformConfig(CosmosOLTPSource.scala:46)
         at com.microsoft.azure.cosmos.oltp.spark.CosmosOLTPSource.sourceSchema(CosmosOLTPSource.scala:53)
         at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:208)
         at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
         at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
         at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
         at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
         at py4j.Gateway.invoke(Gateway.java:282)
         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
         at py4j.commands.CallCommand.execute(CallCommand.java:79)
         at py4j.GatewayConnection.run(GatewayConnection.java:238)
         at java.lang.Thread.run(Thread.java:748)
     Caused by: java.lang.Exception: Access token couldn't be obtained {"result":"DependencyError","errorId":"BadRequest","errorMessage":"LSRServiceException is [{\"StatusCode\":400,\"ErrorResponse\":{\"code\":\"LSRLinkedServiceFailure\",\"message\":\"Could not load the Linked Service\",\"target\":\"CosmosDb_Test\"},\"Message\":\"Could not load the Linked Service\",\"Data\":{},\"InnerException\":null,\"StackTrace\":\"   at Microsoft.Marlin.Common.ADF.Impl.LSRClient.CheckForFailures(HttpResponseMessage response, String responseContent) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 272\\r\\n   at Microsoft.Marlin.Common.ADF.Impl.LSRClient.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken, String traceId) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 288\\r\\n   at Microsoft.Marlin.Common.ADF.Impl.LSRClient.ResolveLinkedServiceAsync(String linkedServiceName, ResolveAudienceRequest request, String traceId, CancellationToken cancellationToken) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 185\\r\\n   at Microsoft.Marlin.TokenService.Token.LSRAudienceTokenProvider.GetToken(Boolean isLinkedService, String audience, String sessionToken, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Token\\\\LSRAudienceTokenProvider.cs:line 129\\r\\n   at Microsoft.Marlin.TokenService.Token.LSRAudienceTokenProvider.GetTokenForAudienceAsync(Boolean isLinkedService, String audience, String account, String sessionToken, SignaturePayload signaturePayload, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Token\\\\LSRAudienceTokenProvider.cs:line 67\\r\\n   at Microsoft.Marlin.TokenService.Controllers.TokenController.GetTokenAsync(TokenRequest request, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Controllers\\\\TokenController.cs:line 67\\r\\n   at lambda_method(Closure , Object )\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ActionMethodExecutor.AwaitableObjectResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeActionMethodAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeNextActionFilterAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.Rethrow(ActionExecutedContext context)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeInnerFilterAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeNextResourceFilter()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.Rethrow(ResourceExecutedContext context)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeFilterPipelineAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeAsync()\\r\\n   at Microsoft.AspNetCore.Routing.EndpointMiddleware.Invoke(HttpContext httpContext)\\r\\n   at Microsoft.AspNetCore.Routing.EndpointRoutingMiddleware.Invoke(HttpContext httpContext)\\r\\n   at Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware.Invoke(HttpContext context)\\r\\n   at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)\\r\\n   at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)\\r\\n   at Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware.Invoke(HttpContext context)\\r\\n   at Microsoft.AspNetCore.Builder.Extensions.MapWhenMiddleware.Invoke(HttpContext context)\\r\\n   at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.Invoke(HttpContext context)\",\"HelpLink\":null,\"Source\":\"Microsoft.Marlin.Common.ADF\",\"HResult\":-2146233088}]. TraceId : f099b286-8a14-4847-ab11-0b0434f6b9b1. Error Component : LSR"}
         at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$$anonfun$9.apply(TokenLibrary.scala:382)
         at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$$anonfun$9.apply(TokenLibrary.scala:374)
         at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:1808)
         at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:1807)
         at com.twitter.util.Promise$FutureTransformer.liftedTree1$1(Promise.scala:240)
         at com.twitter.util.Promise$FutureTransformer.k(Promise.scala:240)
         at com.twitter.util.Promise$Transformer.apply(Promise.scala:215)
         at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:91)
         at com.twitter.util.Promise$WaitQueue$$anon$4.run(Promise.scala:86)
         at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
         at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
         at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
         at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
         at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:86)
         at com.twitter.util.Promise.updateIfEmpty(Promise.scala:778)
         at com.twitter.util.Promise.update(Promise.scala:750)
         at com.twitter.util.Promise.setValue(Promise.scala:726)
         at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:123)
         at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:168)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at com.twitter.finagle.netty4.http.handler.UnpoolHttpHandler$.channelRead(UnpoolHttpHandler.scala:32)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at com.twitter.finagle.netty4.http.handler.ClientExceptionMapper$.channelRead(ClientExceptionMapper.scala:35)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at com.twitter.finagle.netty4.http.handler.HeaderValidatorHandler$.channelRead(HeaderValidatorHandler.scala:51)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
         at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
         at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
         at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
         at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
         at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
         at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
         at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
         at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
         at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:483)
         at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:383)
         at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
         at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
         at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
         ... 1 more
        
     Traceback (most recent call last):
        
       File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 400, in load
         return self._df(self._jreader.load())
        
       File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
         answer, self.gateway_client, self.target_id, self.name)
        
       File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
         return f(*a, **kw)
        
       File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
         format(target_id, ".", name), value)

Steps to reproduce:
1. Create Cosmos DB + Synapse Analytics
2. Create Linked Services and Cosmos DB Analytical Store
3. Paste above Code










azure-synapse-analyticsazure-cosmos-db
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

1 Answer

PRADEEPCHEEKATLA-MSFT avatar image
0 Votes"
PRADEEPCHEEKATLA-MSFT answered

Hello @SandroFalter-5926,

Welcome to the Microsoft Q&A platform.

From the error message "Resolving Azure CosmosDB LinkedService [CosmosDb_Test] in Azure Synapse failed.Validate the configured LinkedService. If still seeing this, try using the Azure CosmosDB account name and credentials directly." it clearly says failed to validate the linked service connection.

From the stack trace it looks like unable to load the Azure CosmosDB linked service - "StatusCode\":400,\"ErrorResponse\":{\"code\":\"LSRLinkedServiceFailure\",\"message\":\"Could not load the Linked Service.

You can try to below steps to connect Azure Cosmos DB using Apache Spark 2 in Azure Synapse Link:

Step1: Go to Linked and select the Azure Cosmos DB linked service and test the connection.

133591-image.png

Step2: Select the container and click on actions => New Notebook => Load streaming DataFrame from container.

133592-image.png

Step3: Select the Spark pool and run the code to load the dataframe from container.

133438-image.png

For more details, refer to Interact with Azure Cosmos DB using Apache Spark 2 in Azure Synapse Link.

Hope this will help. Please let us know if any further queries.


  • Please don't forget to click on 130616-image.png or upvote 130671-image.png button whenever the information provided helps you. Original posters help the community find answers faster by identifying the correct answer. Here is how

  • Want a reminder to come back and check responses? Here is how to subscribe to a notification

  • If you are interested in joining the VM program and help shape the future of Q&A: Here is how you can be part of Q&A Volunteer Moderators


image.png (132.1 KiB)
image.png (56.1 KiB)
image.png (120.9 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.