Back to index

openldap  2.4.31
ndbio.cpp
Go to the documentation of this file.
00001 /* ndbio.cpp - get/set/del data for NDB */
00002 /* $OpenLDAP$ */
00003 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
00004  *
00005  * Copyright 2008-2012 The OpenLDAP Foundation.
00006  * All rights reserved.
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted only as authorized by the OpenLDAP
00010  * Public License.
00011  *
00012  * A copy of this license is available in the file LICENSE in the
00013  * top-level directory of the distribution or, alternatively, at
00014  * <http://www.OpenLDAP.org/license.html>.
00015  */
00016 /* ACKNOWLEDGEMENTS:
00017  * This work was initially developed by Howard Chu for inclusion
00018  * in OpenLDAP Software. This work was sponsored by MySQL.
00019  */
00020 
00021 #include "portable.h"
00022 
00023 #include <stdio.h>
00024 #include <ac/string.h>
00025 #include <ac/errno.h>
00026 #include <lutil.h>
00027 
00028 #include "back-ndb.h"
00029 
00030 /* For reference only */
00031 typedef struct MedVar {
00032        Int16 len;    /* length is always little-endian */
00033        char buf[1024];
00034 } MedVar;
00035 
00036 extern "C" {
00037        static int ndb_name_cmp( const void *v1, const void *v2 );
00038        static int ndb_oc_dup_err( void *v1, void *v2 );
00039 };
00040 
00041 static int
00042 ndb_name_cmp( const void *v1, const void *v2 )
00043 {
00044        NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
00045        return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
00046 }
00047 
00048 static int
00049 ndb_oc_dup_err( void *v1, void *v2 )
00050 {
00051        NdbOcInfo *oc = (NdbOcInfo *)v2;
00052 
00053        oc->no_oc = (ObjectClass *)v1;
00054        return -1;
00055 }
00056 
00057 /* Find an existing NdbAttrInfo */
00058 extern "C" NdbAttrInfo *
00059 ndb_ai_find( struct ndb_info *ni, AttributeType *at )
00060 {
00061        NdbAttrInfo atmp;
00062        atmp.na_name = at->sat_cname;
00063 
00064        return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
00065 }
00066 
00067 /* Find or create an NdbAttrInfo */
00068 extern "C" NdbAttrInfo *
00069 ndb_ai_get( struct ndb_info *ni, struct berval *aname )
00070 {
00071        NdbAttrInfo atmp, *ai;
00072        atmp.na_name = *aname;
00073 
00074        ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
00075        if ( !ai ) {
00076               const char *text;
00077               AttributeDescription *ad = NULL;
00078 
00079               if ( slap_bv2ad( aname, &ad, &text ))
00080                      return NULL;
00081 
00082               ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
00083               ai->na_desc = ad;
00084               ai->na_attr = ai->na_desc->ad_type;
00085               ai->na_name = ai->na_attr->sat_cname;
00086               ai->na_oi = NULL;
00087               ai->na_flag = 0;
00088               ai->na_ixcol = 0;
00089               ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
00090               /* Reasonable default */
00091               if ( !ai->na_len ) {
00092                      if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
00093                             ai->na_len = 1024;
00094                      else
00095                             ai->na_len = 128;
00096               }
00097               /* Arbitrary limit */
00098               if ( ai->na_len > 1024 )
00099                      ai->na_len = 1024;
00100               avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
00101        }
00102        return ai;
00103 }
00104 
00105 static int
00106 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
00107        int create )
00108 {
00109        NdbAttrInfo *ai;
00110        int i;
00111 
00112        for ( i=0; attrs[i]; i++ ) {
00113               if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
00114                      continue;
00115               /* skip attrs that are in a superior */
00116               if ( oci->no_oc && oci->no_oc->soc_sups ) {
00117                      int j, k, found=0;
00118                      ObjectClass *oc;
00119                      for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
00120                             oc = oci->no_oc->soc_sups[j];
00121                             if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
00122                                    continue;
00123                             if ( oc->soc_required ) {
00124                                    for ( k=0; oc->soc_required[k]; k++ ) {
00125                                           if ( attrs[i] == oc->soc_required[k] ) {
00126                                                  found = 1;
00127                                                  break;
00128                                           }
00129                                    }
00130                                    if ( found ) break;
00131                             }
00132                             if ( oc->soc_allowed ) {
00133                                    for ( k=0; oc->soc_allowed[k]; k++ ) {
00134                                           if ( attrs[i] == oc->soc_allowed[k] ) {
00135                                                  found = 1;
00136                                                  break;
00137                                           }
00138                                    }
00139                                    if ( found ) break;
00140                             }
00141                      }
00142                      if ( found )
00143                             continue;
00144               }
00145 
00146               ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
00147               if ( !ai ) {
00148                      /* can never happen */
00149                      return LDAP_OTHER;
00150               }
00151 
00152               /* An attrset may have already been connected */
00153               if (( oci->no_flag & NDB_INFO_ATSET ) && ai->na_oi == oci )
00154                      continue;
00155 
00156               /* An indexed attr is defined before its OC is */
00157               if ( !ai->na_oi ) {
00158                      ai->na_oi = oci;
00159                      ai->na_column = (*col)++;
00160               }
00161 
00162               oci->no_attrs[oci->no_nattrs++] = ai;
00163 
00164               /* An attrset attr may already be defined */
00165               if ( ai->na_oi != oci ) {
00166                      int j;
00167                      for ( j=0; j<oci->no_nsets; j++ )
00168                             if ( oci->no_sets[j] == ai->na_oi ) break;
00169                      if ( j >= oci->no_nsets ) {
00170                             /* FIXME: data loss if more sets are in use */
00171                             if ( oci->no_nsets < NDB_MAX_OCSETS ) {
00172                                    oci->no_sets[oci->no_nsets++] = ai->na_oi;
00173                             }
00174                      }
00175                      continue;
00176               }
00177 
00178               if ( create ) {
00179                      if ( ai->na_flag & NDB_INFO_ATBLOB ) {
00180                             *ptr += sprintf( *ptr, ", `%s` BLOB", ai->na_attr->sat_cname.bv_val );
00181                      } else {
00182                             *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
00183                                    ai->na_len );
00184                      }
00185               }
00186        }
00187        return 0;
00188 }
00189 
00190 static int
00191 ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
00192 {
00193        char buf[4096], *ptr;
00194        int i, rc = 0, col;
00195 
00196        if ( create ) {
00197               ptr = buf + sprintf( buf,
00198                      "CREATE TABLE `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
00199                      oci->no_table.bv_val );
00200        }
00201 
00202        col = 0;
00203        if ( oci->no_oc->soc_required ) {
00204               for ( i=0; oci->no_oc->soc_required[i]; i++ );
00205               col += i;
00206        }
00207        if ( oci->no_oc->soc_allowed ) {
00208               for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
00209               col += i;
00210        }
00211        /* assume all are present */
00212        oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
00213 
00214        col = 2;
00215        ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
00216        if ( oci->no_oc->soc_required ) {
00217               rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
00218        }
00219        if ( !rc && oci->no_oc->soc_allowed ) {
00220               rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
00221        }
00222        ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
00223 
00224        /* shrink down to just the needed size */
00225        oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
00226               oci->no_nattrs * sizeof(struct ndb_attrinfo *));
00227 
00228        if ( create ) {
00229               ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
00230               rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
00231               if ( rc ) {
00232                      Debug( LDAP_DEBUG_ANY,
00233                             "ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
00234                             oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
00235               }
00236        }
00237        return rc;
00238 }
00239 
00240 /* Read table definitions from the DB and populate ObjectClassInfo */
00241 extern "C" int
00242 ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
00243 {
00244        const NdbDictionary::Table *myTable;
00245        const NdbDictionary::Column *myCol;
00246        NdbOcInfo *oci, octmp;
00247        NdbAttrInfo *ai;
00248        ObjectClass *oc;
00249        NdbDictionary::Dictionary::List myList;
00250        struct berval bv;
00251        int i, j, rc, col;
00252 
00253        rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
00254        /* Populate our objectClass structures */
00255        for ( i=0; i<myList.count; i++ ) {
00256               /* Ignore other DBs */
00257               if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
00258                      continue;
00259               /* Ignore internal tables */
00260               if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
00261                      continue;
00262               ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
00263               oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
00264               if ( oci )
00265                      continue;
00266 
00267               oc = oc_bvfind( &octmp.no_name );
00268               if ( !oc ) {
00269                      /* undefined - shouldn't happen */
00270                      continue;
00271               }
00272               myTable = myDict->getTable( myList.elements[i].name );
00273               oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
00274               oci->no_table.bv_val = (char *)(oci+1);
00275               strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
00276               oci->no_table.bv_len = oc->soc_cname.bv_len;
00277               oci->no_name = oci->no_table;
00278               oci->no_oc = oc;
00279               oci->no_flag = 0;
00280               oci->no_nsets = 0;
00281               oci->no_nattrs = 0;
00282               col = 0;
00283               /* Make space for all attrs, even tho sups will be dropped */
00284               if ( oci->no_oc->soc_required ) {
00285                      for ( j=0; oci->no_oc->soc_required[j]; j++ );
00286                      col = j;
00287               }
00288               if ( oci->no_oc->soc_allowed ) {
00289                      for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
00290                      col += j;
00291               }
00292               oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
00293               avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
00294 
00295               col = myTable->getNoOfColumns();
00296               /* Skip 0 and 1, eid and vid */
00297               for ( j = 2; j<col; j++ ) {
00298                      myCol = myTable->getColumn( j );
00299                      ber_str2bv( myCol->getName(), 0, 0, &bv );
00300                      ai = ndb_ai_get( ni, &bv );
00301                      /* shouldn't happen */
00302                      if ( !ai )
00303                             continue;
00304                      ai->na_oi = oci;
00305                      ai->na_column = j;
00306                      ai->na_len = myCol->getLength();
00307                      if ( myCol->getType() == NdbDictionary::Column::Blob )
00308                             ai->na_flag |= NDB_INFO_ATBLOB;
00309               }
00310        }
00311        /* Link to any attrsets */
00312        for ( i=0; i<myList.count; i++ ) {
00313               /* Ignore other DBs */
00314               if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
00315                      continue;
00316               /* Ignore internal tables */
00317               if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
00318                      continue;
00319               ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
00320               oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
00321               /* shouldn't happen */
00322               if ( !oci )
00323                      continue;
00324               col = 2;
00325               if ( oci->no_oc->soc_required ) {
00326                      rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
00327               }
00328               if ( oci->no_oc->soc_allowed ) {
00329                      rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
00330               }
00331               /* shrink down to just the needed size */
00332               oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
00333                      oci->no_nattrs * sizeof(struct ndb_attrinfo *));
00334        }
00335        return 0;
00336 }
00337 
00338 static int
00339 ndb_oc_list( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
00340        struct berval *oname, int implied, NdbOcs *out )
00341 {
00342        const NdbDictionary::Table *myTable;
00343        NdbOcInfo *oci, octmp;
00344        ObjectClass *oc;
00345        int i, rc;
00346 
00347        /* shortcut top */
00348        if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
00349               octmp.no_name = *oname;
00350               oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
00351               if ( oci ) {
00352                      oc = oci->no_oc;
00353               } else {
00354                      oc = oc_bvfind( oname );
00355                      if ( !oc ) {
00356                             /* undefined - shouldn't happen */
00357                             return LDAP_INVALID_SYNTAX;
00358                      }
00359               }
00360               if ( oc->soc_sups ) {
00361                      int i;
00362 
00363                      for ( i=0; oc->soc_sups[i]; i++ ) {
00364                             rc = ndb_oc_list( ni, myDict, &oc->soc_sups[i]->soc_cname, 1, out );
00365                             if ( rc ) return rc;
00366                      }
00367               }
00368        } else {
00369               oc = slap_schema.si_oc_top;
00370        }
00371        /* Only insert once */
00372        for ( i=0; i<out->no_ntext; i++ )
00373               if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
00374                      break;
00375        if ( i == out->no_ntext ) {
00376               for ( i=0; i<out->no_nitext; i++ )
00377                      if ( out->no_itext[i].bv_val == oc->soc_cname.bv_val )
00378                             break;
00379               if ( i == out->no_nitext ) {
00380                      if ( implied )
00381                             out->no_itext[out->no_nitext++] = oc->soc_cname;
00382                      else
00383                             out->no_text[out->no_ntext++] = oc->soc_cname;
00384               }
00385        }
00386 
00387        /* ignore top, etc... */
00388        if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
00389               return 0;
00390 
00391        if ( !oci ) {
00392               ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
00393               oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
00394               oci->no_table.bv_val = (char *)(oci+1);
00395               strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
00396               oci->no_table.bv_len = oc->soc_cname.bv_len;
00397               oci->no_name = oci->no_table;
00398               oci->no_oc = oc;
00399               oci->no_flag = 0;
00400               oci->no_nsets = 0;
00401               oci->no_nattrs = 0;
00402               ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
00403               if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
00404                      octmp.no_oc = oci->no_oc;
00405                      ch_free( oci );
00406                      oci = (NdbOcInfo *)octmp.no_oc;
00407               }
00408               /* see if the oc table already exists in the DB */
00409               myTable = myDict->getTable( oci->no_table.bv_val );
00410               rc = ndb_oc_create( ni, oci, myTable == NULL );
00411               ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
00412               ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
00413               if ( rc ) return rc;
00414        }
00415        /* Only insert once */
00416        for ( i=0; i<out->no_ninfo; i++ )
00417               if ( out->no_info[i] == oci )
00418                      break;
00419        if ( i == out->no_ninfo )
00420               out->no_info[out->no_ninfo++] = oci;
00421        return 0;
00422 }
00423 
00424 extern "C" int
00425 ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
00426 {
00427        NdbOcInfo *oci, octmp;
00428        int i, rc;
00429 
00430        octmp.no_name = *sname;
00431        oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
00432        if ( oci )
00433               return LDAP_ALREADY_EXISTS;
00434 
00435        for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
00436               if ( !at_bvfind( &attrs[i] ))
00437                      return LDAP_NO_SUCH_ATTRIBUTE;
00438        }
00439        i++;
00440 
00441        oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
00442               i*sizeof(AttributeType *) + sname->bv_len+1 );
00443        oci->no_oc = (ObjectClass *)(oci+1);
00444        oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
00445        oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
00446 
00447        for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
00448               oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
00449 
00450        strcpy( oci->no_table.bv_val, sname->bv_val );
00451        oci->no_table.bv_len = sname->bv_len;
00452        oci->no_name = oci->no_table;
00453        oci->no_oc->soc_cname = oci->no_name;
00454        oci->no_flag = NDB_INFO_ATSET;
00455 
00456        if ( !ber_bvcmp( sname, &slap_schema.si_oc_extensibleObject->soc_cname ))
00457               oci->no_oc->soc_kind = slap_schema.si_oc_extensibleObject->soc_kind;
00458 
00459        rc = ndb_oc_create( ni, oci, 0 );
00460        if ( !rc )
00461               rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
00462        if ( rc ) {
00463               ch_free( oci );
00464        } else {
00465               *ret = oci;
00466        }
00467        return rc;
00468 }
00469 
00470 extern "C" int
00471 ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
00472 {
00473        char buf[4096], *ptr;
00474        NdbAttrInfo *ai;
00475        int i;
00476 
00477        ptr = buf + sprintf( buf,
00478               "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
00479               oci->no_table.bv_val );
00480 
00481        for ( i=0; i<oci->no_nattrs; i++ ) {
00482               if ( oci->no_attrs[i]->na_oi != oci )
00483                      continue;
00484               ai = oci->no_attrs[i];
00485               ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
00486                      ai->na_len );
00487               if ( ai->na_flag & NDB_INFO_INDEX ) {
00488                      ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
00489               }
00490        }
00491        ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
00492        i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
00493        if ( i ) {
00494               Debug( LDAP_DEBUG_ANY,
00495                      "ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
00496                      oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
00497        }
00498        return i;
00499 }
00500 
00501 static int
00502 ndb_oc_check( BackendDB *be, Ndb *ndb,
00503        struct berval *ocsin, NdbOcs *out )
00504 {
00505        struct ndb_info *ni = (struct ndb_info *) be->be_private;
00506        const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
00507 
00508        int i, rc = 0;
00509 
00510        out->no_ninfo = 0;
00511        out->no_ntext = 0;
00512        out->no_nitext = 0;
00513 
00514        /* Find all objectclasses and their superiors. List
00515         * the superiors first.
00516         */
00517 
00518        ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
00519        for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
00520               rc = ndb_oc_list( ni, myDict, &ocsin[i], 0, out );
00521               if ( rc ) break;
00522        }
00523        ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
00524        return rc;
00525 }
00526 
00527 #define       V_INS  1
00528 #define       V_DEL  2
00529 #define       V_REP  3
00530 
00531 static int ndb_flush_blobs;
00532 
00533 /* set all the unique attrs of this objectclass into the table
00534  */
00535 extern "C" int
00536 ndb_oc_attrs(
00537        NdbTransaction *txn,
00538        const NdbDictionary::Table *myTable,
00539        Entry *e,
00540        NdbOcInfo *no,
00541        NdbAttrInfo **attrs,
00542        int nattrs,
00543        Attribute *old
00544 )
00545 {
00546        char buf[65538], *ptr;
00547        Attribute **an, **ao, *a;
00548        NdbOperation *myop;
00549        int i, j, max = 0;
00550        int changed, rc;
00551        Uint64 eid = e->e_id;
00552 
00553        if ( !nattrs )
00554               return 0;
00555 
00556        an = (Attribute **)ch_malloc( 2 * nattrs * sizeof(Attribute *));
00557        ao = an + nattrs;
00558 
00559        /* Turn lists of attrs into arrays for easier access */
00560        for ( i=0; i<nattrs; i++ ) {
00561               if ( attrs[i]->na_oi != no ) {
00562                      an[i] = NULL;
00563                      ao[i] = NULL;
00564                      continue;
00565               }
00566               for ( a=e->e_attrs; a; a=a->a_next ) {
00567                      if ( a->a_desc == slap_schema.si_ad_objectClass )
00568                             continue;
00569                      if ( a->a_desc->ad_type == attrs[i]->na_attr ) {
00570                             /* Don't process same attr twice */
00571                             if ( a->a_flags & SLAP_ATTR_IXADD )
00572                                    a = NULL;
00573                             else
00574                                    a->a_flags |= SLAP_ATTR_IXADD;
00575                             break;
00576                      }
00577               }
00578               an[i] = a;
00579               if ( a && a->a_numvals > max )
00580                      max = a->a_numvals;
00581               for ( a=old; a; a=a->a_next ) {
00582                      if ( a->a_desc == slap_schema.si_ad_objectClass )
00583                             continue;
00584                      if ( a->a_desc->ad_type == attrs[i]->na_attr )
00585                             break;
00586               }
00587               ao[i] = a;
00588               if ( a && a->a_numvals > max )
00589                      max = a->a_numvals;
00590        }
00591 
00592        for ( i=0; i<max; i++ ) {
00593               myop = NULL;
00594               for ( j=0; j<nattrs; j++ ) {
00595                      if ( !an[j] && !ao[j] )
00596                             continue;
00597                      changed = 0;
00598                      if ( an[j] && an[j]->a_numvals > i ) {
00599                             /* both old and new are present, compare for changes */
00600                             if ( ao[j] && ao[j]->a_numvals > i ) {
00601                                    if ( ber_bvcmp( &ao[j]->a_nvals[i], &an[j]->a_nvals[i] ))
00602                                           changed = V_REP;
00603                             } else {
00604                                    changed = V_INS;
00605                             }
00606                      } else {
00607                             if ( ao[j] && ao[j]->a_numvals > i )
00608                                    changed = V_DEL;
00609                      }
00610                      if ( changed ) {
00611                             if ( !myop ) {
00612                                    rc = LDAP_OTHER;
00613                                    myop = txn->getNdbOperation( myTable );
00614                                    if ( !myop ) {
00615                                           goto done;
00616                                    }
00617                                    if ( old ) {
00618                                           if ( myop->writeTuple()) {
00619                                                  goto done;
00620                                           }
00621                                    } else {
00622                                           if ( myop->insertTuple()) {
00623                                                  goto done;
00624                                           }
00625                                    }
00626                                    if ( myop->equal( EID_COLUMN, eid )) {
00627                                           goto done;
00628                                    }
00629                                    if ( myop->equal( VID_COLUMN, i )) {
00630                                           goto done;
00631                                    }
00632                             }
00633                             if ( attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
00634                                    NdbBlob *myBlob = myop->getBlobHandle( attrs[j]->na_column );
00635                                    rc = LDAP_OTHER;
00636                                    if ( !myBlob ) {
00637                                           Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: getBlobHandle failed %s (%d)\n",
00638                                                  myop->getNdbError().message, myop->getNdbError().code, 0 );
00639                                           goto done;
00640                                    }
00641                                    if ( slapMode & SLAP_TOOL_MODE )
00642                                           ndb_flush_blobs = 1;
00643                                    if ( changed & V_INS ) {
00644                                           if ( myBlob->setValue( an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len )) {
00645                                                  Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
00646                                                         myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
00647                                                  goto done;
00648                                           }
00649                                    } else {
00650                                           if ( myBlob->setValue( NULL, 0 )) {
00651                                                  Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
00652                                                         myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
00653                                                  goto done;
00654                                           }
00655                                    }
00656                             } else {
00657                                    if ( changed & V_INS ) {
00658                                           if ( an[j]->a_vals[i].bv_len > attrs[j]->na_len ) {
00659                                                  Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
00660                                                         attrs[j]->na_name.bv_val, 0, 0 );
00661                                                  rc = LDAP_CONSTRAINT_VIOLATION;
00662                                                  goto done;
00663                                           }
00664                                           ptr = buf;
00665                                           *ptr++ = an[j]->a_vals[i].bv_len & 0xff;
00666                                           if ( attrs[j]->na_len > 255 ) {
00667                                                  /* MedVar */
00668                                                  *ptr++ = an[j]->a_vals[i].bv_len >> 8;
00669                                           }
00670                                           memcpy( ptr, an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len );
00671                                           ptr = buf;
00672                                    } else {
00673                                           ptr = NULL;
00674                                    }
00675                                    if ( myop->setValue( attrs[j]->na_column, ptr )) {
00676                                           rc = LDAP_OTHER;
00677                                           goto done;
00678                                    }
00679                             }
00680                      }
00681               }
00682        }
00683        rc = LDAP_SUCCESS;
00684 done:
00685        ch_free( an );
00686        if ( rc ) {
00687               Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: failed %s (%d)\n",
00688                      myop->getNdbError().message, myop->getNdbError().code, 0 );
00689        }
00690        return rc;
00691 }
00692 
00693 static int
00694 ndb_oc_put(
00695        const NdbDictionary::Dictionary *myDict,
00696        NdbTransaction *txn, NdbOcInfo *no, Entry *e )
00697 {
00698        const NdbDictionary::Table *myTable;
00699        int i, rc;
00700 
00701        for ( i=0; i<no->no_nsets; i++ ) {
00702               rc = ndb_oc_put( myDict, txn, no->no_sets[i], e );
00703               if ( rc )
00704                      return rc;
00705        }
00706 
00707        myTable = myDict->getTable( no->no_table.bv_val );
00708        if ( !myTable )
00709               return LDAP_OTHER;
00710 
00711        return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, NULL );
00712 }
00713 
00714 /* This is now only used for Adds. Modifies call ndb_oc_attrs directly. */
00715 extern "C" int
00716 ndb_entry_put_data(
00717        BackendDB *be,
00718        NdbArgs *NA
00719 )
00720 {
00721        struct ndb_info *ni = (struct ndb_info *) be->be_private;
00722        Attribute *aoc;
00723        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
00724        NdbOcs myOcs;
00725        int i, rc;
00726 
00727        /* Get the entry's objectClass attribute */
00728        aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
00729        if ( !aoc )
00730               return LDAP_OTHER;
00731 
00732        ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
00733        myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
00734 
00735        /* Walk thru objectclasses, find all the attributes belonging to a class */
00736        for ( i=0; i<myOcs.no_ninfo; i++ ) {
00737               rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e );
00738               if ( rc ) return rc;
00739        }
00740 
00741        /* slapadd tries to batch multiple entries per txn, but entry data is
00742         * transient and blob data is required to remain valid for the whole txn.
00743         * So we need to flush blobs before their source data disappears.
00744         */
00745        if (( slapMode & SLAP_TOOL_MODE ) && ndb_flush_blobs )
00746               NA->txn->execute( NdbTransaction::NoCommit );
00747 
00748        return 0;
00749 }
00750 
00751 static void
00752 ndb_oc_get( Operation *op, NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
00753 {
00754        int i;
00755        NdbOcInfo  **ol2;
00756 
00757        for ( i=0; i<no->no_nsets; i++ ) {
00758               ndb_oc_get( op, no->no_sets[i], j, nocs, oclist );
00759        }
00760 
00761        /* Don't insert twice */
00762        ol2 = *oclist;
00763        for ( i=0; i<*j; i++ )
00764               if ( ol2[i] == no )
00765                      return;
00766 
00767        if ( *j >= *nocs ) {
00768               *nocs *= 2;
00769               ol2 = (NdbOcInfo **)op->o_tmprealloc( *oclist, *nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
00770               *oclist = ol2;
00771        }
00772        ol2 = *oclist;
00773        ol2[(*j)++] = no;
00774 }
00775 
00776 /* Retrieve attribute data for given entry. The entry's DN and eid should
00777  * already be populated.
00778  */
00779 extern "C" int
00780 ndb_entry_get_data(
00781        Operation *op,
00782        NdbArgs *NA,
00783        int update
00784 )
00785 {
00786        struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
00787        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
00788        const NdbDictionary::Table *myTable;
00789        NdbIndexScanOperation **myop = NULL;
00790        Uint64 eid;
00791 
00792        Attribute *a;
00793        NdbOcs myOcs;
00794        NdbOcInfo *oci, **oclist = NULL;
00795        char abuf[65536], *ptr, **attrs = NULL;
00796        struct berval bv[2];
00797        int *ocx = NULL;
00798 
00799        /* FIXME: abuf should be dynamically allocated */
00800 
00801        int i, j, k, nocs, nattrs, rc = LDAP_OTHER;
00802 
00803        eid = NA->e->e_id;
00804 
00805        ndb_oc_check( op->o_bd, NA->ndb, NA->ocs, &myOcs );
00806        myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
00807        nocs = myOcs.no_ninfo;
00808 
00809        oclist = (NdbOcInfo **)op->o_tmpcalloc( 1, nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
00810 
00811        for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
00812               ndb_oc_get( op, myOcs.no_info[i], &j, &nocs, &oclist );
00813        }
00814 
00815        nocs = j;
00816        nattrs = 0;
00817        for ( i=0; i<nocs; i++ )
00818               nattrs += oclist[i]->no_nattrs;
00819 
00820        ocx = (int *)op->o_tmpalloc( nocs * sizeof(int), op->o_tmpmemctx );
00821 
00822        attrs = (char **)op->o_tmpalloc( nattrs * sizeof(char *), op->o_tmpmemctx );
00823 
00824        myop = (NdbIndexScanOperation **)op->o_tmpalloc( nattrs * sizeof(NdbIndexScanOperation *), op->o_tmpmemctx );
00825 
00826        k = 0;
00827        ptr = abuf;
00828        for ( i=0; i<nocs; i++ ) {
00829               oci = oclist[i];
00830 
00831               myop[i] = NA->txn->getNdbIndexScanOperation( "PRIMARY", oci->no_table.bv_val );
00832               if ( !myop[i] )
00833                      goto leave;
00834               if ( myop[i]->readTuples( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
00835                      goto leave;
00836               if ( myop[i]->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
00837                      goto leave;
00838 
00839               for ( j=0; j<oci->no_nattrs; j++ ) {
00840                      if ( oci->no_attrs[j]->na_oi != oci )
00841                             continue;
00842                      if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
00843                             NdbBlob *bi = myop[i]->getBlobHandle( oci->no_attrs[j]->na_column );
00844                             attrs[k++] = (char *)bi;
00845                      } else {
00846                             attrs[k] = ptr;
00847                             *ptr++ = 0;
00848                             if ( oci->no_attrs[j]->na_len > 255 )
00849                                    *ptr++ = 0;
00850                             ptr += oci->no_attrs[j]->na_len + 1;
00851                             myop[i]->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
00852                      }
00853               }
00854               ocx[i] = k;
00855        }
00856        /* Must use IgnoreError, because an entry with multiple objectClasses may not
00857         * actually have attributes defined in each class / table.
00858         */
00859        if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 )
00860               goto leave;
00861 
00862        /* count results */
00863        for ( i=0; i<nocs; i++ ) {
00864               if (( j = myop[i]->nextResult(true) )) {
00865                      if ( j < 0 ) {
00866                             Debug( LDAP_DEBUG_TRACE,
00867                                    "ndb_entry_get_data: first nextResult(%d) failed: %s (%d)\n",
00868                                    i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
00869                      }
00870                      myop[i] = NULL;
00871               }
00872        }
00873 
00874        nattrs = 0;
00875        k = 0;
00876        for ( i=0; i<nocs; i++ ) {
00877               oci = oclist[i];
00878               for ( j=0; j<oci->no_nattrs; j++ ) {
00879                      unsigned char *buf;
00880                      int len;
00881                      if ( oci->no_attrs[j]->na_oi != oci )
00882                             continue;
00883                      if ( !myop[i] ) {
00884                             attrs[k] = NULL;
00885                      } else if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
00886                             void *vi = attrs[k];
00887                             NdbBlob *bi = (NdbBlob *)vi;
00888                             int isNull;
00889                             bi->getNull( isNull );
00890                             if ( !isNull ) {
00891                                    nattrs++;
00892                             } else {
00893                                    attrs[k] = NULL;
00894                             }
00895                      } else {
00896                             buf = (unsigned char *)attrs[k];
00897                             len = buf[0];
00898                             if ( oci->no_attrs[j]->na_len > 255 ) {
00899                                    /* MedVar */
00900                                    len |= (buf[1] << 8);
00901                             }
00902                             if ( len ) {
00903                                    nattrs++;
00904                             } else {
00905                                    attrs[k] = NULL;
00906                             }
00907                      }
00908                      k++;
00909               }
00910        }
00911 
00912        a = attrs_alloc( nattrs+1 );
00913        NA->e->e_attrs = a;
00914 
00915        a->a_desc = slap_schema.si_ad_objectClass;
00916        a->a_vals = NULL;
00917        ber_bvarray_dup_x( &a->a_vals, NA->ocs, NULL );
00918        a->a_nvals = a->a_vals;
00919        a->a_numvals = myOcs.no_ntext;
00920 
00921        BER_BVZERO( &bv[1] );
00922 
00923        do {
00924               a = NA->e->e_attrs->a_next;
00925               k = 0;
00926               for ( i=0; i<nocs; k=ocx[i], i++ ) {
00927                      oci = oclist[i];
00928                      for ( j=0; j<oci->no_nattrs; j++ ) {
00929                             unsigned char *buf;
00930                             struct berval nbv;
00931                             if ( oci->no_attrs[j]->na_oi != oci )
00932                                    continue;
00933                             buf = (unsigned char *)attrs[k++];
00934                             if ( !buf )
00935                                    continue;
00936                             if ( !myop[i] ) {
00937                                    a=a->a_next;
00938                                    continue;
00939                             }
00940                             if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
00941                                    void *vi = (void *)buf;
00942                                    NdbBlob *bi = (NdbBlob *)vi;
00943                                    Uint64 len;
00944                                    Uint32 len2;
00945                                    int isNull;
00946                                    bi->getNull( isNull );
00947                                    if ( isNull ) {
00948                                           a = a->a_next;
00949                                           continue;
00950                                    }
00951                                    bi->getLength( len );
00952                                    bv[0].bv_len = len;
00953                                    bv[0].bv_val = (char *)ch_malloc( len+1 );
00954                                    len2 = len;
00955                                    if ( bi->readData( bv[0].bv_val, len2 )) {
00956                                           Debug( LDAP_DEBUG_TRACE,
00957                                                  "ndb_entry_get_data: blob readData failed: %s (%d), len %d\n",
00958                                                  bi->getNdbError().message, bi->getNdbError().code, len2 );
00959                                    }
00960                                    bv[0].bv_val[len] = '\0';
00961                                    ber_bvarray_add_x( &a->a_vals, bv, NULL );
00962                             } else {
00963                                    bv[0].bv_len = buf[0];
00964                                    if ( oci->no_attrs[j]->na_len > 255 ) {
00965                                           /* MedVar */
00966                                           bv[0].bv_len |= (buf[1] << 8);
00967                                           bv[0].bv_val = (char *)buf+2;
00968                                           buf[1] = 0;
00969                                    } else {
00970                                           bv[0].bv_val = (char *)buf+1;
00971                                    }
00972                                    buf[0] = 0;
00973                                    if ( bv[0].bv_len == 0 ) {
00974                                           a = a->a_next;
00975                                           continue;
00976                                    }
00977                                    bv[0].bv_val[bv[0].bv_len] = '\0';
00978                                    value_add_one( &a->a_vals, bv );
00979                             }
00980                             a->a_desc = oci->no_attrs[j]->na_desc;
00981                             attr_normalize_one( a->a_desc, bv, &nbv, NULL );
00982                             a->a_numvals++;
00983                             if ( !BER_BVISNULL( &nbv )) {
00984                                    ber_bvarray_add_x( &a->a_nvals, &nbv, NULL );
00985                             } else if ( !a->a_nvals ) {
00986                                    a->a_nvals = a->a_vals;
00987                             }
00988                             a = a->a_next;
00989                      }
00990               }
00991               k = 0;
00992               for ( i=0; i<nocs; i++ ) {
00993                      if ( !myop[i] )
00994                             continue;
00995                      if ((j = myop[i]->nextResult(true))) {
00996                             if ( j < 0 ) {
00997                                    Debug( LDAP_DEBUG_TRACE,
00998                                           "ndb_entry_get_data: last nextResult(%d) failed: %s (%d)\n",
00999                                           i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
01000                             }
01001                             myop[i] = NULL;
01002                      } else {
01003                             k = 1;
01004                      }
01005               }
01006        } while ( k );
01007 
01008        rc = 0;
01009 leave:
01010        if ( myop ) {
01011               op->o_tmpfree( myop, op->o_tmpmemctx );
01012        }
01013        if ( attrs ) {
01014               op->o_tmpfree( attrs, op->o_tmpmemctx );
01015        }
01016        if ( ocx ) {
01017               op->o_tmpfree( ocx, op->o_tmpmemctx );
01018        }
01019        if ( oclist ) {
01020               op->o_tmpfree( oclist, op->o_tmpmemctx );
01021        }
01022 
01023        return rc;
01024 }
01025 
01026 static int
01027 ndb_oc_del( 
01028        NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
01029 {
01030        NdbIndexScanOperation *myop;
01031        int i, rc;
01032 
01033        for ( i=0; i<no->no_nsets; i++ ) {
01034               rc = ndb_oc_del( txn, eid, no->no_sets[i] );
01035               if ( rc ) return rc;
01036        }
01037 
01038        myop = txn->getNdbIndexScanOperation( "PRIMARY", no->no_table.bv_val );
01039        if ( !myop )
01040               return LDAP_OTHER;
01041        if ( myop->readTuples( NdbOperation::LM_Exclusive ))
01042               return LDAP_OTHER;
01043        if ( myop->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
01044               return LDAP_OTHER;
01045 
01046        txn->execute(NoCommit);
01047        while ( myop->nextResult(true) == 0) {
01048               do {
01049                      myop->deleteCurrentTuple();
01050               } while (myop->nextResult(false) == 0);
01051               txn->execute(NoCommit);
01052        }
01053 
01054        return 0;
01055 }
01056 
01057 extern "C" int
01058 ndb_entry_del_data(
01059        BackendDB *be,
01060        NdbArgs *NA
01061 )
01062 {
01063        struct ndb_info *ni = (struct ndb_info *) be->be_private;
01064        Uint64 eid = NA->e->e_id;
01065        int i;
01066        NdbOcs myOcs;
01067 
01068        ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
01069        myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
01070 
01071        for ( i=0; i<myOcs.no_ninfo; i++ ) {
01072               if ( ndb_oc_del( NA->txn, eid, myOcs.no_info[i] ))
01073                      return LDAP_OTHER;
01074        }
01075 
01076        return 0;
01077 }
01078 
01079 extern "C" int
01080 ndb_dn2rdns(
01081        struct berval *dn,
01082        NdbRdns *rdns
01083 )
01084 {
01085        char *beg, *end;
01086        int i, len;
01087 
01088        /* Walk thru RDNs */
01089        end = dn->bv_val + dn->bv_len;
01090        for ( i=0; i<NDB_MAX_RDNS; i++ ) {
01091               for ( beg = end-1; beg > dn->bv_val; beg-- ) {
01092                      if (*beg == ',') {
01093                             beg++;
01094                             break;
01095                      }
01096               }
01097               if ( beg >= dn->bv_val ) {
01098                      len = end - beg;
01099                      /* RDN is too long */
01100                      if ( len > NDB_RDN_LEN )
01101                             return LDAP_CONSTRAINT_VIOLATION;
01102                      memcpy( rdns->nr_buf[i]+1, beg, len );
01103               } else {
01104                      break;
01105               }
01106               rdns->nr_buf[i][0] = len;
01107               end = beg - 1;
01108        }
01109        /* Too many RDNs in DN */
01110        if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
01111                      return LDAP_CONSTRAINT_VIOLATION;
01112        }
01113        rdns->nr_num = i;
01114        return 0;
01115 }
01116 
01117 static int
01118 ndb_rdns2keys(
01119        NdbOperation *myop,
01120        NdbRdns *rdns
01121 )
01122 {
01123        int i;
01124        char dummy[2] = {0,0};
01125 
01126        /* Walk thru RDNs */
01127        for ( i=0; i<rdns->nr_num; i++ ) {
01128               if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
01129                      return LDAP_OTHER;
01130        }
01131        for ( ; i<NDB_MAX_RDNS; i++ ) {
01132               if ( myop->equal( i+RDN_COLUMN, dummy ))
01133                      return LDAP_OTHER;
01134        }
01135        return 0;
01136 }
01137 
01138 /* Store the DN2ID_TABLE fields */
01139 extern "C" int
01140 ndb_entry_put_info(
01141        BackendDB *be,
01142        NdbArgs *NA,
01143        int update
01144 )
01145 {
01146        struct ndb_info *ni = (struct ndb_info *) be->be_private;
01147        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
01148        const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
01149        NdbOperation *myop;
01150        NdbAttrInfo *ai;
01151        Attribute *aoc, *a;
01152 
01153        /* Get the entry's objectClass attribute; it's ok to be
01154         * absent on a fresh insert
01155         */
01156        aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
01157        if ( update && !aoc )
01158               return LDAP_OBJECT_CLASS_VIOLATION;
01159 
01160        myop = NA->txn->getNdbOperation( myTable );
01161        if ( !myop )
01162               return LDAP_OTHER;
01163        if ( update ) {
01164               if ( myop->updateTuple())
01165                      return LDAP_OTHER;
01166        } else {
01167               if ( myop->insertTuple())
01168                      return LDAP_OTHER;
01169        }
01170 
01171        if ( ndb_rdns2keys( myop, NA->rdns ))
01172               return LDAP_OTHER;
01173 
01174        /* Set entry ID */
01175        {
01176               Uint64 eid = NA->e->e_id;
01177               if ( myop->setValue( EID_COLUMN, eid ))
01178                      return LDAP_OTHER;
01179        }
01180 
01181        /* Set list of objectClasses */
01182        /* List is <sp> <class> <sp> <class> <sp> ... so that
01183         * searches for " class " will yield accurate results
01184         */
01185        if ( aoc ) {
01186               char *ptr, buf[sizeof(MedVar)];
01187               NdbOcs myOcs;
01188               int i;
01189 
01190               ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
01191               ptr = buf+2;
01192               *ptr++ = ' ';
01193               for ( i=0; i<myOcs.no_ntext; i++ ) {
01194                      /* data loss... */
01195                      if ( ptr + myOcs.no_text[i].bv_len + 1 >= &buf[sizeof(buf)] )
01196                             break;
01197                      ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
01198                      *ptr++ = ' ';
01199               }
01200 
01201               /* implicit classes */
01202               if ( myOcs.no_nitext ) {
01203                      *ptr++ = '@';
01204                      *ptr++ = ' ';
01205                      for ( i=0; i<myOcs.no_nitext; i++ ) {
01206                             /* data loss... */
01207                             if ( ptr + myOcs.no_itext[i].bv_len + 1 >= &buf[sizeof(buf)] )
01208                                    break;
01209                             ptr = lutil_strcopy( ptr, myOcs.no_itext[i].bv_val );
01210                             *ptr++ = ' ';
01211                      }
01212               }
01213 
01214               i = ptr - buf - 2;
01215               buf[0] = i & 0xff;
01216               buf[1] = i >> 8;
01217               if ( myop->setValue( OCS_COLUMN, buf ))
01218                      return LDAP_OTHER;
01219        }
01220 
01221        /* Set any indexed attrs */
01222        for ( a = NA->e->e_attrs; a; a=a->a_next ) {
01223               ai = ndb_ai_find( ni, a->a_desc->ad_type );
01224               if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
01225                      char *ptr, buf[sizeof(MedVar)];
01226                      int len;
01227 
01228                      ptr = buf+1;
01229                      len = a->a_vals[0].bv_len;
01230                      /* FIXME: data loss */
01231                      if ( len > ai->na_len )
01232                             len = ai->na_len;
01233                      buf[0] = len & 0xff;
01234                      if ( ai->na_len > 255 ) {
01235                             *ptr++ = len >> 8;
01236                      }
01237                      memcpy( ptr, a->a_vals[0].bv_val, len );
01238                      if ( myop->setValue( ai->na_ixcol, buf ))
01239                             return LDAP_OTHER;
01240               }
01241        }
01242 
01243        return 0;
01244 }
01245 
01246 extern "C" struct berval *
01247 ndb_str2bvarray(
01248        char *str,
01249        int len,
01250        char delim,
01251        void *ctx
01252 )
01253 {
01254        struct berval *list, tmp;
01255        char *beg;
01256        int i, num;
01257 
01258        while ( *str == delim ) {
01259               str++;
01260               len--;
01261        }
01262 
01263        while ( str[len-1] == delim ) {
01264               str[--len] = '\0';
01265        }
01266 
01267        for ( i = 1, beg = str;; i++ ) {
01268               beg = strchr( beg, delim );
01269               if ( !beg )
01270                      break;
01271               if ( beg >= str + len )
01272                      break;
01273               beg++;
01274        }
01275 
01276        num = i;
01277        list = (struct berval *)slap_sl_malloc( (num+1)*sizeof(struct berval), ctx);
01278 
01279        for ( i = 0, beg = str; i<num; i++ ) {
01280               tmp.bv_val = beg;
01281               beg = strchr( beg, delim );
01282               if ( beg >= str + len )
01283                      beg = NULL;
01284               if ( beg ) {
01285                      tmp.bv_len = beg - tmp.bv_val;
01286               } else {
01287                      tmp.bv_len = len - (tmp.bv_val - str);
01288               }
01289               ber_dupbv_x( &list[i], &tmp, ctx );
01290               beg++;
01291        }
01292 
01293        BER_BVZERO( &list[i] );
01294        return list;
01295 }
01296 
01297 extern "C" struct berval *
01298 ndb_ref2oclist(
01299        const char *ref,
01300        void *ctx
01301 )
01302 {
01303        char *implied;
01304 
01305        /* MedVar */
01306        int len = ref[0] | (ref[1] << 8);
01307 
01308        /* don't return the implied classes */
01309        implied = (char *)memchr( ref+2, '@', len );
01310        if ( implied ) {
01311               len = implied - ref - 2;
01312               *implied = '\0';
01313        }
01314 
01315        return ndb_str2bvarray( (char *)ref+2, len, ' ', ctx );
01316 }
01317 
01318 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
01319  * the existence of a DN.
01320  */
01321 extern "C" int
01322 ndb_entry_get_info(
01323        Operation *op,
01324        NdbArgs *NA,
01325        int update,
01326        struct berval *matched
01327 )
01328 {
01329        struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
01330        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
01331        const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
01332        NdbOperation *myop[NDB_MAX_RDNS];
01333        NdbRecAttr *eid[NDB_MAX_RDNS], *oc[NDB_MAX_RDNS];
01334        char idbuf[NDB_MAX_RDNS][2*sizeof(ID)];
01335        char ocbuf[NDB_MAX_RDNS][NDB_OC_BUFLEN];
01336 
01337        if ( matched ) {
01338               BER_BVZERO( matched );
01339        }
01340        if ( !myTable ) {
01341               return LDAP_OTHER;
01342        }
01343 
01344        myop[0] = NA->txn->getNdbOperation( myTable );
01345        if ( !myop[0] ) {
01346               return LDAP_OTHER;
01347        }
01348 
01349        if ( myop[0]->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
01350               return LDAP_OTHER;
01351        }
01352 
01353        if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
01354               return LDAP_NO_SUCH_OBJECT;
01355        }
01356 
01357        if ( ndb_rdns2keys( myop[0], NA->rdns )) {
01358               return LDAP_OTHER;
01359        }
01360 
01361        eid[0] = myop[0]->getValue( EID_COLUMN, idbuf[0] );
01362        if ( !eid[0] ) {
01363               return LDAP_OTHER;
01364        }
01365 
01366        ocbuf[0][0] = 0;
01367        ocbuf[0][1] = 0;
01368        if ( !NA->ocs ) {
01369               oc[0] = myop[0]->getValue( OCS_COLUMN, ocbuf[0] );
01370               if ( !oc[0] ) {
01371                      return LDAP_OTHER;
01372               }
01373        }
01374 
01375        if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
01376               return LDAP_OTHER;
01377        }
01378 
01379        switch( myop[0]->getNdbError().code ) {
01380        case 0:
01381               if ( !eid[0]->isNULL() && ( NA->e->e_id = eid[0]->u_64_value() )) {
01382                      /* If we didn't care about OCs, or we got them */
01383                      if ( NA->ocs || ocbuf[0][0] || ocbuf[0][1] ) {
01384                             /* If wanted, return them */
01385                             if ( !NA->ocs )
01386                                    NA->ocs = ndb_ref2oclist( ocbuf[0], op->o_tmpmemctx );
01387                             break;
01388                      }
01389               }
01390               /* FALLTHRU */
01391        case NDB_NO_SUCH_OBJECT:    /* no such tuple: look for closest parent */
01392               if ( matched ) {
01393                      int i, j, k;
01394                      char dummy[2] = {0,0};
01395 
01396                      /* get to last RDN, then back up 1 */
01397                      k = NA->rdns->nr_num - 1;
01398 
01399                      for ( i=0; i<k; i++ ) {
01400                             myop[i] = NA->txn->getNdbOperation( myTable );
01401                             if ( !myop[i] )
01402                                    return LDAP_OTHER;
01403                             if ( myop[i]->readTuple( NdbOperation::LM_CommittedRead ))
01404                                    return LDAP_OTHER;
01405                             for ( j=0; j<=i; j++ ) {
01406                                    if ( myop[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
01407                                           return LDAP_OTHER;
01408                             }
01409                             for ( ;j<NDB_MAX_RDNS; j++ ) {
01410                                    if ( myop[i]->equal( j+RDN_COLUMN, dummy ))
01411                                           return LDAP_OTHER;
01412                             }
01413                             eid[i] = myop[i]->getValue( EID_COLUMN, idbuf[i] );
01414                             if ( !eid[i] ) {
01415                                    return LDAP_OTHER;
01416                             }
01417                             ocbuf[i][0] = 0;
01418                             ocbuf[i][1] = 0;
01419                             if ( !NA->ocs ) {
01420                                    oc[i] = myop[0]->getValue( OCS_COLUMN, ocbuf[i] );
01421                                    if ( !oc[i] ) {
01422                                           return LDAP_OTHER;
01423                                    }
01424                             }
01425                      }
01426                      if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
01427                             return LDAP_OTHER;
01428                      }
01429                      for ( --i; i>=0; i-- ) {
01430                             if ( myop[i]->getNdbError().code == 0 ) {
01431                                    for ( j=0; j<=i; j++ )
01432                                           matched->bv_len += NA->rdns->nr_buf[j][0];
01433                                    NA->erdns = NA->rdns->nr_num;
01434                                    NA->rdns->nr_num = j;
01435                                    matched->bv_len += i;
01436                                    matched->bv_val = NA->e->e_name.bv_val +
01437                                           NA->e->e_name.bv_len - matched->bv_len;
01438                                    if ( !eid[i]->isNULL() )
01439                                           NA->e->e_id = eid[i]->u_64_value();
01440                                    if ( !NA->ocs )
01441                                           NA->ocs = ndb_ref2oclist( ocbuf[i], op->o_tmpmemctx );
01442                                    break;
01443                             }
01444                      }
01445               }
01446               return LDAP_NO_SUCH_OBJECT;
01447        default:
01448               return LDAP_OTHER;
01449        }
01450 
01451        return 0;
01452 }
01453 
01454 extern "C" int
01455 ndb_entry_del_info(
01456        BackendDB *be,
01457        NdbArgs *NA
01458 )
01459 {
01460        struct ndb_info *ni = (struct ndb_info *) be->be_private;
01461        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
01462        const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
01463        NdbOperation *myop;
01464 
01465        myop = NA->txn->getNdbOperation( myTable );
01466        if ( !myop )
01467               return LDAP_OTHER;
01468        if ( myop->deleteTuple())
01469               return LDAP_OTHER;
01470 
01471        if ( ndb_rdns2keys( myop, NA->rdns ))
01472               return LDAP_OTHER;
01473 
01474        return 0;
01475 }
01476 
01477 extern "C" int
01478 ndb_next_id(
01479        BackendDB *be,
01480        Ndb *ndb,
01481        ID *id
01482 )
01483 {
01484        struct ndb_info *ni = (struct ndb_info *) be->be_private;
01485        const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
01486        const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
01487        Uint64 nid = 0;
01488        int rc;
01489 
01490        if ( !myTable ) {
01491               Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
01492                      0, 0, 0 );
01493               return LDAP_OTHER;
01494        }
01495 
01496        rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
01497        if ( !rc )
01498               *id = nid;
01499        return rc;
01500 }
01501 
01502 extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
01503 static void
01504 ndb_thread_hfree( void *key, void *data )
01505 {
01506        Ndb *ndb = (Ndb *)data;
01507        delete ndb;
01508 }
01509 
01510 extern "C" int
01511 ndb_thread_handle(
01512        Operation *op,
01513        Ndb **ndb )
01514 {
01515        struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
01516        void *data;
01517 
01518        if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
01519               Ndb *myNdb;
01520               int rc;
01521               ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
01522               myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
01523               if ( ni->ni_nextconn >= ni->ni_nconns )
01524                      ni->ni_nextconn = 0;
01525               ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
01526               if ( !myNdb ) {
01527                      return LDAP_OTHER;
01528               }
01529               rc = myNdb->init(1024);
01530               if ( rc ) {
01531                      delete myNdb;
01532                      Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
01533                             rc, 0, 0 );
01534                      return rc;
01535               }
01536               data = (void *)myNdb;
01537               if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
01538                      data, ndb_thread_hfree, NULL, NULL ))) {
01539                      delete myNdb;
01540                      Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
01541                             rc, 0, 0 );
01542                      return rc;
01543               }
01544        }
01545        *ndb = (Ndb *)data;
01546        return 0;
01547 }
01548 
01549 extern "C" int
01550 ndb_entry_get(
01551        Operation *op,
01552        struct berval *ndn,
01553        ObjectClass *oc,
01554        AttributeDescription *ad,
01555        int rw,
01556        Entry **ent )
01557 {
01558        struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
01559        NdbArgs NA;
01560        Entry e = {0};
01561        int rc;
01562 
01563        /* Get our NDB handle */
01564        rc = ndb_thread_handle( op, &NA.ndb );
01565 
01566        NA.txn = NA.ndb->startTransaction();
01567        if( !NA.txn ) {
01568               Debug( LDAP_DEBUG_TRACE,
01569                      LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
01570                      NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
01571               return 1;
01572        }
01573 
01574        e.e_name = *ndn;
01575        NA.e = &e;
01576        /* get entry */
01577        {
01578               NdbRdns rdns;
01579               rdns.nr_num = 0;
01580               NA.ocs = NULL;
01581               NA.rdns = &rdns;
01582               rc = ndb_entry_get_info( op, &NA, rw, NULL );
01583        }
01584        if ( rc == 0 ) {
01585               e.e_name = *ndn;
01586               e.e_nname = *ndn;
01587               rc = ndb_entry_get_data( op, &NA, 0 );
01588               ber_bvarray_free( NA.ocs );
01589               if ( rc == 0 ) {
01590                      if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
01591                             attrs_free( e.e_attrs );
01592                             rc = 1;
01593                      }
01594               }
01595        }
01596        if ( rc == 0 ) {
01597               *ent = entry_alloc();
01598               **ent = e;
01599               ber_dupbv( &(*ent)->e_name, ndn );
01600               ber_dupbv( &(*ent)->e_nname, ndn );
01601        } else {
01602               rc = 1;
01603        }
01604        NA.txn->close();
01605        return rc;
01606 }
01607 
01608 /* Congestion avoidance code
01609  * for Deadlock Rollback
01610  */
01611 
01612 extern "C" void
01613 ndb_trans_backoff( int num_retries )
01614 {
01615        int i;
01616        int delay = 0;
01617        int pow_retries = 1;
01618        unsigned long key = 0;
01619        unsigned long max_key = -1;
01620        struct timeval timeout;
01621 
01622        lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
01623 
01624        for ( i = 0; i < num_retries; i++ ) {
01625               if ( i >= 5 ) break;
01626               pow_retries *= 4;
01627        }
01628 
01629        delay = 16384 * (key * (double) pow_retries / (double) max_key);
01630        delay = delay ? delay : 1;
01631 
01632        Debug( LDAP_DEBUG_TRACE,  "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
01633 
01634        timeout.tv_sec = delay / 1000000;
01635        timeout.tv_usec = delay % 1000000;
01636        select( 0, NULL, NULL, NULL, &timeout );
01637 }
01638 
01639 extern "C" void
01640 ndb_check_referral( Operation *op, SlapReply *rs, NdbArgs *NA )
01641 {
01642        struct berval dn, ndn;
01643        int i, dif;
01644        dif = NA->erdns - NA->rdns->nr_num;
01645 
01646        /* Set full DN of matched into entry */
01647        for ( i=0; i<dif; i++ ) {
01648               dnParent( &NA->e->e_name, &dn );
01649               dnParent( &NA->e->e_nname, &ndn );
01650               NA->e->e_name = dn;
01651               NA->e->e_nname = ndn;
01652        }
01653 
01654        /* return referral only if "disclose" is granted on the object */
01655        if ( access_allowed( op, NA->e, slap_schema.si_ad_entry,
01656               NULL, ACL_DISCLOSE, NULL )) {
01657               Attribute a;
01658               for ( i=0; !BER_BVISNULL( &NA->ocs[i] ); i++ );
01659               a.a_numvals = i;
01660               a.a_desc = slap_schema.si_ad_objectClass;
01661               a.a_vals = NA->ocs;
01662               a.a_nvals = NA->ocs;
01663               a.a_next = NULL;
01664               NA->e->e_attrs = &a;
01665               if ( is_entry_referral( NA->e )) {
01666                      NA->e->e_attrs = NULL;
01667                      ndb_entry_get_data( op, NA, 0 );
01668                      rs->sr_ref = get_entry_referrals( op, NA->e );
01669                      if ( rs->sr_ref ) {
01670                             rs->sr_err = LDAP_REFERRAL;
01671                             rs->sr_flags |= REP_REF_MUSTBEFREED;
01672                      }
01673                      attrs_free( NA->e->e_attrs );
01674               }
01675               NA->e->e_attrs = NULL;
01676        }
01677 }