fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser
fromtypingimportUnionfromcouchbaseimportLOCKMODE_WAITfromcouchbase.bucketimportBucketfromcouchbase.clusterimportCluster,PasswordAuthenticatorfromfastapiimportFastAPIfrompydanticimportBaseModelUSERPROFILE_DOC_TYPE="userprofile"defget_bucket():cluster=Cluster("couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300")authenticator=PasswordAuthenticator("username","password")cluster.authenticate(authenticator)bucket:Bucket=cluster.open_bucket("bucket_name",lockmode=LOCKMODE_WAIT)bucket.timeout=30bucket.n1ql_timeout=300returnbucketclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):type:str=USERPROFILE_DOC_TYPEhashed_password:strdefget_user(bucket:Bucket,username:str):doc_id=f"userprofile::{username}"result=bucket.get(doc_id,quiet=True)ifnotresult.value:returnNoneuser=UserInDB(**result.value)returnuser# FastAPI specific codeapp=FastAPI()@app.get("/users/{username}",response_model=User)defread_user(username:str):bucket=get_bucket()user=get_user(bucket=bucket,username=username)returnuser